Temporal SDK Development Guide

Core Concepts

What is a Temporal Application?

A Temporal Application is a collection of Workflow Definitions, Activity Definitions, and Worker Processes that execute your business logic. It relies on a Temporal Service (either Temporal Cloud or a self-hosted cluster) to manage the state and execution of Workflows reliably.

Workflows

A Workflow is the core abstraction in Temporal, representing a durable, reliable, and potentially long-running function execution. Workflows orchestrate the execution of Activities to achieve a business goal.

Key Properties:

  • Durability: Workflow state is persisted by the Temporal Service, allowing execution to survive process failures and resume automatically.
  • Reliability: The Temporal Platform guarantees that Workflow code executes exactly once to completion, handling retries and failures transparently.
  • Deterministic Constraints: Workflow code must be deterministic. This means it must produce the same output given the same input, without relying on external system state or side effects (except through Activities).

Workflows are defined as functions or methods in your chosen SDK language. They interact with the Temporal system via SDK APIs to schedule Activities, manage timers, wait for external events (Signals), and more.

Activities

An Activity represents a single, well-defined unit of work within a Workflow, typically involving interaction with the outside world (for example, database calls, API requests, sending emails).

Key Properties:

  • Non-deterministic Code: Unlike Workflows, Activity code can interact freely with external systems and perform non-deterministic operations.
  • Managed Execution: Activities are executed by Workers. The Temporal Platform manages their retries, timeouts, and tracking based on configured policies.
  • Idempotent Design: Activities should ideally be designed to be idempotent. This means executing them multiple times with the same input yields the same result, which is crucial for handling potential retries safely.

Activities are defined as functions or methods and are invoked asynchronously from Workflows.

Workers

A Worker is a process that hosts Workflow and Activity implementations. Workers poll Task Queues on the Temporal Service, receive Tasks (representing Workflow or Activity executions), execute the corresponding code, and report results back.

Responsibilities:

  • Execute Workflow logic based on its history.
  • Execute Activity Tasks, interacting with external systems as needed.
  • Communicate with the Temporal Service to fetch Tasks and report progress/results.

You can run multiple Worker processes for scalability and fault tolerance. Workers are stateless regarding Workflow executions (state is maintained by the Cluster), but they manage Activity execution state locally (for example, if an Activity needs to checkpoint progress).

Task Queues

A Task Queue is a lightweight, dynamic queue managed by the Temporal Service. When a Workflow Execution is started or an Activity is scheduled, a Task is placed onto a specific Task Queue.

Purpose:

  • Routing: Workers listen on specific Task Queues. This directs Tasks to the appropriate set of Workers capable of handling them.
  • Load Balancing: The Temporal Service distributes Tasks from a queue among the available Workers listening on it.
  • Prioritization (Enterprise): Task Queues can be configured for task prioritization in Temporal Cluster versions supporting it.

Task Queues are fundamental for decoupling Workflow/Activity scheduling from Worker execution.

Setup and Installation

Temporal CLI and Development Server

The Temporal CLI is essential for interacting with a Temporal Service (including Temporal Cloud) and includes a lightweight development server for local testing. This server runs as a single process with an in-memory database (or optional file persistence) and includes the Temporal Web UI.

CLI Installation

The CLI is available for macOS, Linux, and Windows.

  • macOS (Homebrew): brew install temporal
  • Linux (Homebrew): brew install temporal
  • Manual Download (All Platforms): Visit temporal.io/download, select your platform/architecture, download the archive, extract it, and add the temporal (or temporal.exe) binary to your system's PATH.

Development Server Execution

Start the development server using the following command:

temporal server start-dev

This command starts the server (listening on localhost:7233 by default), creates the default default Namespace, and starts the Web UI (accessible at http://localhost:8233).

SDK Installation

Install the Temporal SDK for your preferred language:

Go

Add the Go SDK module to your project:

go get go.temporal.io/sdk

Or clone the repository:

git clone https://github.com/temporalio/sdk-go.git

API Reference: pkg.go.dev/go.temporal.io/sdk

Java

Add the Temporal SDK dependency to your project using Maven or Gradle.

Maven (pom.xml):

<dependency>
  <groupId>io.temporal</groupId>
  <artifactId>temporal-sdk</artifactId>
  <version>LATEST_VERSION</version> <!-- Replace with desired version -->
</dependency>

Gradle (build.gradle):

implementation \'io.temporal:temporal-sdk:LATEST_VERSION\' // Replace with desired version

Find the latest version on Maven Central.

API Reference: Javadoc

.NET

Add the Temporal .NET SDK package to your project using the .NET CLI:

dotnet add package Temporalio

Or using NuGet Package Manager.

API Reference: dotnet.temporal.io

PHP

Install the SDK using Composer:

composer require temporal/sdk

API Reference: php.temporal.io

Python

Install the SDK using pip:

pip install temporalio

Requires Python 3.8+.

API Reference: python.temporal.io

TypeScript / JavaScript

Create a new project using the Temporal initializer:

npx @temporalio/create@latest ./your-app

Or add to an existing project (requires Node.js 16.15+):

npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity @temporalio/common

API Reference: typescript.temporal.io

Workflow Development

Workflows orchestrate Activities and manage durable state. They execute deterministically, ensuring resilience against failures.

Workflow Definition

Define Workflows using interfaces and implementations (Java, Go, PHP) or functions/classes (Python, TypeScript, .NET). Workflow logic must be deterministic.

TypeScript

Define Workflow logic within an async function.

import * as wf from '@temporalio/workflow';
import * as activities from './activities'; // Assuming activities.ts defines activities

// Define Activities using an interface or proxy object.
const { composeGreeting } = wf.proxyActivities<typeof activities>({
  startToCloseTimeout: '1 minute',
});

/** A Workflow Definition */
export async function exampleWorkflow(name: string): Promise<string> {
  const greeting = await composeGreeting('Hello', name);
  return greeting;
}

Python

Define Workflows within classes.

from temporalio import workflow
from datetime import timedelta
import activities # Assume activities.py defines compose_greeting

# Import activity stubs (created via @activity.defn)
# Use this approach for type safety if stubs generated
# from activities import GreetingActivities

@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        # Execute an Activity
        return await workflow.execute_activity(
            compose_greeting,
            ("Hello", name),
            start_to_close_timeout=timedelta(minutes=1),
        )

Go

Define Workflows as functions.

package app

import (
"time"
"go.temporal.io/sdk/workflow"
"activities" // Assuming activities package defines ComposeGreeting
)

// Assumes activities.go defines ComposeGreeting

func YourWorkflow(ctx workflow.Context, name string) (string, error) {
ao := workflow.ActivityOptions{
    StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)

var result string
err := workflow.ExecuteActivity(ctx, ComposeGreeting, "Hello", name).Get(ctx, &result)
if err != nil {
    return "", err
}
return result, nil
}

Java

Define a Workflow interface annotated with @WorkflowInterface and implement it.

package your.application; // Use your actual package name

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Workflow; // Import Workflow class
import java.time.Duration;
import your.application.activities.YourActivities; // Assume YourActivities interface exists in activities subpackage

@WorkflowInterface
public interface YourWorkflow {
    @WorkflowMethod
    String execute(String name);
}

public class YourWorkflowImpl implements YourWorkflow {
    private final YourActivities activities;

    public YourWorkflowImpl() {
    // Configure Activity options
    ActivityOptions options = ActivityOptions.newBuilder()
        .setStartToCloseTimeout(Duration.ofMinutes(1))
        // Optional: Configure retry options
        .setRetryOptions(RetryOptions.newBuilder()
            .setMaximumAttempts(3)
            .build())
        .build();
    // Create an Activity stub
    this.activities = Workflow.newActivityStub(YourActivities.class, options);
    }

    @Override
    public String execute(String name) {
    // Execute an Activity by calling its method
    return activities.doWork("Hello " + name);
    }
}

.NET

Define Workflows in classes, marking the entry point with [WorkflowRun].

using Temporalio.Workflows;
using Temporalio.Activities; // For Activity scheduling options if needed
using System;
using System.Threading.Tasks;
using YourApplication.Activities; // Assume YourApplication.Activities.MyActivities class exists

[Workflow]
public class YourWorkflow
{
    [WorkflowRun]
    public async Task<string> RunAsync(string name)
    {
    // Execute an Activity
    return await Workflow.ExecuteActivityAsync<string>(
        (MyActivities act) => act.ComposeGreetingAsync("Hello", name),
        new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(1) }
    );
    }
}

PHP

Define a Workflow interface annotated with #[WorkflowInterface] and implement it.

<?php
namespace App\Workflow; // Use appropriate namespace

use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;
use Temporal\Activity\ActivityOptions;
use Temporal\Workflow;
use App\Activity\YourActivityInterface; // Assumes YourActivityInterface exists

#[WorkflowInterface]
interface YourWorkflowInterface
{
    #[WorkflowMethod]
    public function execute(string $name): \\Generator;
}

class YourWorkflow implements YourWorkflowInterface
{
    public function execute(string $name): \\Generator
    {
        Workflow::getLogger()->info('Workflow started', ['name' => $name]);
        // Workflow logic...
        // ... execute activities, timers, etc.
        return 'Workflow completed for ' . $name; // yield return for async
    }
}
?>

Workflow Registration

Workers need to know which Workflow implementations correspond to the Workflow Types requested by Workflows. This is done by registering Workflow implementations with the Worker.

Note: This is typically done automatically by the SDK when you start a Workflow, but it's good to understand the concept for manual registration.

# Pseudocode for Workflow registration

# --- Original Code ---
# result = await workflow.execute_activity(activity_A, ...)

# --- Updated Code ---
# We want to optionally call activity_B before activity_A

# Change ID identifies this specific patch/change
patch_change_id = "add-activity-B-call-v1"

# Default version (original behavior) is -1 (or some SDK-specific default)
# New version is 1 (or the next sequential number for this change ID)
version = workflow.get_version(patch_change_id, default=-1, max_supported=1)

if version == 1:
    # New code path: Call B first
    await workflow.execute_activity(activity_B, ...)

# Code common to both versions (or the original path if version == -1)
result = await workflow.execute_activity(activity_A, ...)

Workflow Constraints (Determinism)

Workflow code must be deterministic. This means that given the same input and history, the code must always produce the same sequence of commands (like starting Activities or Timers). This allows the Temporal Cluster to replay the Workflow history and recover its state after failures.

To maintain determinism:

  • No mutable global state: Avoid relying on or modifying variables outside the Workflow's scope that could change between replays.
  • No direct external calls: Do not make network calls, file system access, or interact with external systems directly from Workflow code. Use Activities for all side effects.
  • Deterministic randomness: If you need random numbers, use the SDK's deterministic random functions (like Workflow.newRandom() in Java/Go, workflow.random() in Python/TS).
  • Deterministic time: Use the SDK's time functions (like Workflow.currentTimeMillis() in Java, Workflow.now() in TS, workflow.now() in Python) instead of system time.
  • Deterministic iteration order: Avoid iterating over maps or dictionaries directly if the order is not guaranteed. Convert them to sorted lists first if iteration order matters.
  • Avoid threading/goroutines (directly): Do not spawn threads or goroutines directly within Workflow code. Use the SDK's mechanisms for concurrency if needed (like Promise.all in TS, Asyncio primitives with workflow.asyncio in Python, Workflow.go in Go, Async.function in Java).

Breaking these rules can lead to non-deterministic errors, which typically surface during replays or failovers and can be hard to debug. The SDK provides APIs (like Side Effects, `Workflow.getVersion`) to handle non-deterministic logic when absolutely necessary.

Workflow Versioning (Patching)

Since Workflow code must be deterministic, deploying updated Workflow code that changes the sequence of commands (like adding a new Activity call, removing a Timer) can break replays of existing, in-flight Workflow Executions that were started with the old code.

The Workflow.getVersion API (or equivalent in each SDK) allows you to safely deploy changes to Workflow definitions without breaking compatibility.

  1. Wrap the new or modified logic within a getVersion block, assigning it a unique change ID (often an integer or descriptive string).
  2. Specify a default version (typically associated with the original code path) and the new version (associated with the changed code path).
  3. When a Worker executes this code during a replay: If the history already contains a marker for this change ID, the SDK automatically executes the code path corresponding to the recorded version.
  4. When a Worker executes this code for the *first time* (not a replay for this specific part), it executes the code associated with the *new* version you specified and records that version marker in the history.

This ensures that existing executions continue deterministically along their original path, while new executions (or executions reaching this point for the first time) use the updated logic.

# Pseudocode for using getVersion

# --- Original Code ---
# result = await workflow.execute_activity(activity_A, ...)

# --- Updated Code ---
# We want to optionally call activity_B before activity_A

# Change ID identifies this specific patch/change
patch_change_id = "add-activity-B-call-v1"

# Default version (original behavior) is -1 (or some SDK-specific default)
# New version is 1 (or the next sequential number for this change ID)
version = workflow.get_version(patch_change_id, default=-1, max_supported=1)

if version == 1:
    # New code path: Call B first
    await workflow.execute_activity(activity_B, ...)

# Code common to both versions (or the original path if version == -1)
result = await workflow.execute_activity(activity_A, ...)

After all in-flight Workflows using the old code path (default version) have completed, you can remove the getVersion call and the old code path, simplifying the Workflow definition.

Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.

Execute Child Workflows

Workflows can start other Workflows, known as Child Workflows. This allows for modular design, breaking down complex business processes into smaller, reusable, and independently testable units.

Key characteristics of Child Workflows:

  • Independent History: Each Child Workflow has its own separate event history.
  • Parent Relationship: The parent Workflow can track the status of the child and receive its result upon completion.
  • Cancellation Propagation: Cancellation can be configured to propagate from parent to child (or not).
  • Retries and Timeouts: Child Workflows have their own timeouts and retry policies, independent of the parent.
  • Task Queues: Child Workflows can run on the same or different Task Queues as the parent.

Starting a Child Workflow is similar to starting a Workflow via the Client, but it's done from within the parent Workflow's code using specific SDK functions (for example, workflow.ExecuteChildWorkflow). Options control the parent-child relationship behavior (for example, what happens if the parent completes before the child).

TypeScript

import { executeChild, proxyActivities, ParentClosePolicy, workflowInfo } from '@temporalio/workflow'; // Import workflowInfo
import { childWorkflow } from './child'; // Assume childWorkflow definition exists

export async function parentWorkflow(name: string): Promise<string> {
    // Start child and wait for result
    const result = await executeChild(childWorkflow, {
        args: [name],
        workflowId: `child-${workflowInfo().workflowId}`, // Ensure unique child ID using workflowInfo
        // Default: ParentClosePolicy.TERMINATE (terminates child if parent closes)
        // Use ABANDON if child should continue after parent finishes
        parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
    });
    return `Parent received: ${result}`;
}

Python

from temporalio import workflow
from temporalio.common import ParentClosePolicy
# Assume child_workflow is defined in another file/module
with workflow.unsafe.imports_passed_through():
    from child_workflows import ChildWorkflow

@workflow.defn
class ParentWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        result = await workflow.execute_child_workflow(
            ChildWorkflow.run,
            f"Input for {name}",
            id=f"child-{workflow.info().workflow_id}",
            parent_close_policy=ParentClosePolicy.ABANDON,
        )
        return f"Parent received: {result}"
                

Go

package app // Assuming parent workflow is in app package

import (
"fmt" // For string formatting
"go.temporal.io/sdk/workflow"
"go.temporal.io/api/enums/v1"
// Assume app defines ChildWorkflow function
"your_module/app"
)

func ParentWorkflow(ctx workflow.Context, name string) (string, error) {
    cwo := workflow.ChildWorkflowOptions{
        // Generate a unique ID, e.g., combining parent ID and a unique part
        WorkflowID: fmt.Sprintf("child-%s-%s", workflow.GetInfo(ctx).WorkflowExecution.ID, workflow.GetRandomUUID(ctx)),
        ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
    }
    ctx = workflow.WithChildOptions(ctx, cwo)

    var result string
    // Assuming ChildWorkflow is defined in the same package or imported correctly
    future := workflow.ExecuteChildWorkflow(ctx, app.ChildWorkflow, name)
    if err := future.Get(ctx, &result); err != nil {
        // Handle child workflow error (e.g., log, return error)
        workflow.GetLogger(ctx).Error("Child workflow failed", "Error", err)
        return "", err
    }
    return "Parent received: " + result, nil
}

// Assume ChildWorkflow definition exists, e.g.:
// func ChildWorkflow(ctx workflow.Context, inputName string) (string, error) {
//     workflow.GetLogger(ctx).Info("Child workflow started", "Input", inputName)
//     return "Result from child for " + inputName, nil
// }
                

Java

package your.application.workflows; // Use your actual package name

import io.temporal.workflow.*;
import io.temporal.api.enums.v1.ParentClosePolicy;
import java.util.UUID; // For UUIDs
// Assume ChildWorkflow interface exists in the same package or imported

public class ParentWorkflowImpl implements ParentWorkflow { // Assuming ParentWorkflow interface
    @Override
    public String execute(String name) {
        ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
            .setWorkflowId("child-" + Workflow.getInfo().getWorkflowId() + "-" + UUID.randomUUID().toString()) // Ensure unique ID
            .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
            .build();

        ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);

        // Start child and wait for result
        String result = child.execute("Input for " + name);
        return "Parent received: " + result;

        // To start asynchronously:
        // Promise promise = Async.function(child::execute, "Input for " + name);
        // Use promise.get() later
    }
}

.NET

using Temporalio.Workflows;
using Temporalio.Common; // For ParentClosePolicy
using System; // For Guid
using System.Threading.Tasks;
using YourApplication.Workflows; // Assuming ChildWorkflow class exists in the same namespace or imported

[Workflow]
public class ParentWorkflow
{
    [WorkflowRun]
    public async Task<string> RunAsync(string name)
    {
        string result = await Workflow.ExecuteChildWorkflowAsync(
            (ChildWorkflow wf) => wf.RunChildAsync($"Input for {name}"), // Ensure RunChildAsync exists on ChildWorkflow
            new ChildWorkflowOptions
            {
                WorkflowId = $"child-{Workflow.Info.WorkflowId}-{Guid.NewGuid()}", // Ensure unique ID
                ParentClosePolicy = ParentClosePolicy.Abandon,
            }
        );
        return $"Parent received: {result}";
    }
}

PHP

<?php declare(strict_types=1);\\\n\\\nnamespace App\\\\\\\\Workflow;\\\\n\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowInterface;\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityOptions;\\\\nuse Temporal\\\\\\\\Common\\\\\\\\RetryOptions;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityInterface;\\\\nuse App\\\\\\\\ChildWorkflows\\\\\\\\ChildWorkflow;\\\\nuse Ramsey\\\\Uuid\\\\Uuid; // Example: Use ramsey/uuid\\\\n\\\\n// Define ParentWorkflow interface and implementation\\\\ninterface ParentWorkflowInterface extends ParentWorkflowInterface<?> {\\\n    @ParentWorkflowMethod(name = "ParentWorkflow")\\\n    void execute(String name);\n}\\\n\\\\nclass ParentWorkflowImpl implements ParentWorkflowInterface {\\\n    @Override\\\n    @ParentWorkflowMethod(name = "ParentWorkflow")\\\n    public void execute(String name) {\\\n        // Start child workflow\\\n        ChildWorkflow.start(\\\"child-workflow\\\", [\\\"name\\\" => $name]);\\\\\n        // You can also use a different workflow ID if needed\\\\\n    }\\\\\n}\\\n?>

Continue-As-New

Workflow Executions have limits on their event history size and duration. For long-running processes or Workflows that accumulate a large history (for example, loops processing many items), `Continue-As-New` allows a Workflow to effectively restart itself with a fresh history, carrying over necessary state.

When a Workflow decides to `Continue-As-New`:

  1. It specifies the same Workflow Type (or potentially a different one).
  2. It provides the input arguments for the *new* execution. This is how state is carried over.
  3. The current execution is marked as `ContinuedAsNew`.
  4. A new Workflow Execution starts immediately with the provided arguments and a clean history, but logically continues the business process. The Run ID changes, but the Workflow ID can be kept the same.

This is the standard pattern for implementing infinite loops or processes that need to run indefinitely without hitting history limits. It's also useful for periodic cleanup of large state within a Workflow.

# Pseudocode for Continue-As-New loop

@workflow.defn
class PerpetualWorkflow:
    @workflow.run
    async def run(self, counter: int, other_state: dict = None): # Example with state
        if other_state is None:
            other_state = {} # Initialize state if first run

        workflow.logger.info(f"Executing run {counter} with state: {other_state}")

        # Example condition to ContinueAsNew (e.g., every 1000 iterations)
        if counter > 0 and counter % 1000 == 0:
            workflow.logger.info(f"Processed {counter} items, continuing as new...")
            # Carry over the necessary state to the new execution
            # Only include state needed for the *next* run
            workflow.continue_as_new(counter, other_state) # Restarts run() with new args
            # Note: Code after continue_as_new does not execute in the current run

        # --- Regular Workflow Logic ---
        workflow.logger.info(f"Processing item {counter}")
        # Do work... update other_state if needed
        # await workflow.execute_activity(process_item, counter, other_state, ...)
        other_state['last_processed'] = counter
        await workflow.sleep(timedelta(seconds=1)) # Simulate work or delay

        # "Loop" by calling continue_as_new with updated state for the next iteration
        workflow.continue_as_new(counter + 1, other_state)

# Client side starting the workflow for the first time:
    async def run(self, counter: int):
        if counter >= 1000: # Example condition to ContinueAsNew
            print(f"Processed {counter} items, continuing as new...")
            # Carry over the counter state to the new execution
            workflow.continue_as_new(counter) # Restarts run() with counter
        else:
            print(f"Processing item {counter}")
            # Do work...
            await workflow.execute_activity(process_item, counter, ...)

            # Schedule next iteration after a delay
            await workflow.sleep(timedelta(seconds=10))
            # "Loop" by calling continue_as_new with updated state
            workflow.continue_as_new(counter + 1)

Side Effects

While Workflows must be deterministic, sometimes you need to execute a short, non-deterministic piece of code within the Workflow logic itself (without calling an Activity). A common example is generating a UUID or getting a random number using a non-SDK function (which would violate determinism if called directly).

Workflow.SideEffect (or equivalent) allows this.

  1. You provide a function/lambda containing the non-deterministic code to SideEffect.
  2. The first time this code path is executed, the function is run, and its return value is recorded in the Workflow history.
  3. During replays, the function is *not* re-executed. Instead, the recorded value from the history is returned directly, preserving determinism.

Side Effects should only be used for short, simple operations where creating a dedicated Activity would be overkill. They are not suitable for I/O or long-running computations. For example, generating a unique ID or calling a potentially flaky function that you don't want to retry as a full Activity.

# Pseudocode for Side Effect

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self):
        # Generate a random ID using a potentially non-deterministic library function
        # This lambda is only executed once per execution path.
        random_id = await workflow.side_effect(lambda: generate_random_id_maybe_impure())

        # Use the random_id, which is now deterministic for replays
        await workflow.execute_activity(use_id_activity, random_id, ...)

        return f"Processed with ID: {random_id}"

# Note: For truly random values needed within workflow logic,
# prefer the SDK's deterministic random functions like workflow.random().
# SideEffect is more for integrating short non-deterministic external code.

Local Activities

Local Activities are an optimization for short-lived Activities that don't require the full overhead of Task Queue polling, scheduling, and retries managed by the Temporal Cluster. They execute directly within the Worker's Workflow Task processing thread.

Characteristics:

  • Execution: Run in the same process as the Workflow code that called them.
  • Latency: Much lower invocation latency compared to standard Activities (sub-millisecond).
  • Retries: Limited, server-side retries are not applicable. Retries must be handled by the Workflow code itself or rely on the Workflow Task retry mechanism. A short, configurable local retry policy might be available in the SDK.
  • Timeouts: Subject to shorter, locally configured timeouts, often tied to the Workflow Task timeout.
  • Use Cases: Ideal for simple functions like data validation, transformation, short calculations, or accessing read-only resources local to the Worker, where the overhead of a standard Activity is undesirable. For example, validating input against a schema loaded in Worker memory.
  • Determinism: Like standard Activities, Local Activities themselves do not need to be deterministic, but their *invocation* from the Workflow must be.

Executing a Local Activity uses a specific SDK function (for example, workflow.ExecuteLocalActivity), often requiring different, shorter timeout configurations than standard Activities.

Mutations

The Workflow.upsertMemo, Workflow.upsertSearchAttributes APIs allow Workflows to update their own Memo and Search Attributes during execution. These updates are recorded in the event history.

Workflow.deprecatePatch is used in conjunction with Workflow.getVersion for managing code cleanup. Once you are sure no running Workflows are using an old code path identified by a specific change ID (patch ID), you can call `deprecatePatch(patch_change_id)`. This signals to the system (and potentially linting tools) that this patch ID is obsolete and its corresponding `getVersion` block can eventually be removed from the code.

Activity Development

Activities encapsulate the actual business logic, performing tasks like calling external services, interacting with databases, or processing data. They are executed by Workers and can be retried automatically.

Activity Definition

Define Activities using interfaces (Java, Go, PHP), functions (Python, TypeScript), or methods within classes (.NET).

TypeScript

Define Activities as plain async functions. You can optionally group them in an object or class for organization. Activity arguments and return types must be serializable by the configured Data Converter (JSON by default). Use Context.current().info to access Activity information like Task Token.

import { Context } from '@temporalio/activity';
import { log } from './logger'; // Assuming a configured logger

export async function composeGreeting(greeting: string, name: string): Promise<string> {
log.info('Activity input', { greeting, name });
const { activityId, workflowExecution } = Context.current().info;
log.info('Running in activity', { activityId, workflowId: workflowExecution.workflowId });

// Simulate external call or work
await new Promise((resolve) => setTimeout(resolve, 500));

return \`\${greeting}, \${name}!\`;
}

// Optional: Group activities
export const activities = { composeGreeting };

Python

Define Activities using the @activity.defn decorator on functions. Activity arguments and return types must be serializable (Defaults to JSON). Use activity.info() to access Activity information.

from temporalio import activity
import time

@activity.defn
def compose_greeting(greeting: str, name: str) -> str:
    activity.logger.info(f"Composing greeting for {name}")
    # Activity logic...
    time.sleep(0.5) # Simulate I/O
    return f"{greeting}, {name}!"

@activity.defn(name="DifferentActivityName")
async def do_async_work(value: int) -> int:
    activity.logger.info(f"Doing async work with {value}")
    # Async activity logic...
    await asyncio.sleep(1)
    return value * 10

Go

Define Activities as functions or methods on a struct. The function signature must include context.Context as the first argument. Arguments and return values must be serializable (JSON by default). Use activity.GetInfo(ctx) to access information.

package app

import (
"context"
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
)

// Simple function as Activity
func ComposeGreeting(ctx context.Context, greeting string, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity invoked", "greeting", greeting, "name", name)

info := activity.GetInfo(ctx)
logger.Info("Running activity", "ActivityID", info.ActivityID, "WorkflowID", info.WorkflowExecution.ID)

// Simulate work
time.Sleep(500 * time.Millisecond)

result := fmt.Sprintf("%s, %s!", greeting, name)
return result, nil
}

// --- Alternatively: Activities as struct methods ---
type MyActivities struct {
// Dependencies can be injected here (e.g., DB connection pool)
}

func (a *MyActivities) SendNotification(ctx context.Context, userID string, message string) error {
logger := activity.GetLogger(ctx)
logger.Info("Sending notification", "UserID", userID, "Message", message)
// ... logic to send notification ...
return nil
}

Java

Define an Activity interface annotated with @ActivityInterface. Each method in the interface defines an Activity Type. Implement this interface.

package your.application.activities; // Use your actual package name

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

@ActivityInterface
public interface YourActivities {
    // Default Activity name is "DoWork"
    String doWork(String input);

    // Explicitly named Activity
    @ActivityMethod(name = "ProcessPayment")
    boolean processPaymentActivity(String orderId, double amount);
}

// Implementation
public class YourActivitiesImpl implements YourActivities {
    @Override
    public String doWork(String input) {
        // Activity logic...
        return "Processed: " + input;
    }

    @Override
    public boolean processPaymentActivity(String orderId, double amount) {
        // Payment processing logic...
        System.out.printf("Processing payment for order %s, amount %.2f%n", orderId, amount);
        return true;
    }
}

.NET

Define Activity methods within any class, marking them with the [Activity] attribute. Methods can be static or instance methods.

using Temporalio.Activities;
using System.Threading.Tasks;

public class MyActivities
{
    [Activity]
    public async Task<string> ComposeGreetingAsync(string greeting, string name)
    {
        // Activity logic...
        Activity.Logger.LogInformation("Running activity with greeting {Greeting} and name {Name}", greeting, name);
        await Task.Delay(500); // Simulate work
        return $"{greeting}, {name}!";
    }

    [Activity("StaticActivity")] // Custom name
    public static string DoStaticWork(int value)
    {
        return $"Static result: {value * 2}";
    }
}

PHP

Define an Activity interface annotated with #[ActivityInterface]. Each method defines an Activity Type. Implement this interface.

<?php

namespace App\Activity;

use Temporal\Activity\ActivityInterface;
use Temporal\Activity\ActivityMethod;

#[ActivityInterface(prefix: "App.Activities.")] // Optional prefix for all activity names
interface YourActivityInterface
{
    public function composeGreeting(string $greeting, string $name): string;

    #[ActivityMethod("ExecutePayment")] // Custom name overrides prefix
    public function executePaymentActivity(string $transactionId): bool;
}

// Implementation
use Temporal\Activity;

class YourActivities implements YourActivityInterface
{
    public function composeGreeting(string $greeting, string $name): string
    {
        Activity::getLogger()->info("Composing greeting", ['greeting' => $greeting, 'name' => $name]);
        // Activity logic...
        return $greeting . ', ' . $name . '!';
    }

    public function executePaymentActivity(string $transactionId): bool
    {
        Activity::getLogger()->info("Executing payment", ['transactionId' => $transactionId]);
        // Payment logic...
        return true;
    }
}
?>

Activity Context and Information

Activities can access metadata about their current execution, such as the Workflow ID, Activity ID, Task Token (needed for asynchronous completion), and configured timeouts. This is typically done via a context object or static methods provided by the SDK. Logging within Activities should ideally use the SDK's logger, which automatically includes this contextual information.

Example: Accessing Activity Info (Conceptual - specific methods vary by SDK as shown in definition examples)

# Inside an Activity...
activity_info = get_current_activity_info()
workflow_id = activity_info.workflow_id
activity_id = activity_info.activity_id
task_token = activity_info.task_token
attempt_number = activity_info.attempt
log("Starting activity", workflow_id=workflow_id, activity_id=activity_id, attempt=attempt_number)

# ... perform activity work ...

# If completing asynchronously later:
# save_task_token_somewhere(task_token)
# return special marker indicating async completion

Activity Registration

Workers need to know which Activity implementations correspond to the Activity Types requested by Workflows. This is done by registering Activity implementations with the Worker.

TypeScript

Pass Activity functions (often grouped in an object) to Worker.create.

import { Worker } from '@temporalio/worker';
import * as activities from './activities'; // Assuming activities = { composeGreeting, ... }

async function run() {
const worker = await Worker.create({
    // ... connection, taskQueue, workflowsPath ...
    activities, // Register all exported functions from activities module
});
await worker.run();
}

Python

Provide Activity functions decorated with @activity.defn to the Worker constructor.

from temporalio.worker import Worker
from temporalio.client import Client
# Assume activities.py defines compose_greeting, etc.
from activities import compose_greeting

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="my-task-queue",
        workflows=[GreetingWorkflow], # Assume GreetingWorkflow is defined
        activities=[compose_greeting], # Register activity functions
    )
    await worker.run()

Go

Use worker.RegisterActivity or worker.RegisterActivityWithOptions.

import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
// Assume app package defines ComposeGreeting and YourWorkflow
"your_module/app"
)

func main() {
c, _ := client.Dial(client.Options{})
defer c.Close()

w := worker.New(c, "my-task-queue", worker.Options{})

// Register Activity function
w.RegisterActivity(app.ComposeGreeting)

// Or register methods from a struct instance
// activityInstance := &app.MyActivities{}
// w.RegisterActivity(activityInstance)

w.RegisterWorkflow(app.YourWorkflow)

err := w.Run(worker.InterruptCh())
// Handle error
}

Java

Register Activity implementation instances using worker.registerActivitiesImplementations.

import io.temporal.worker.WorkerFactory;
import io.temporal.worker.Worker;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
// Assume YourActivitiesImpl and YourWorkflowImpl exist

public class WorkerMain {
    public static void main(String[] args) {
    WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
    WorkflowClient client = WorkflowClient.newInstance(service);
    WorkerFactory factory = WorkerFactory.newInstance(client);

    Worker worker = factory.newWorker("my-task-queue");

    // Register Workflow implementation type
    worker.registerWorkflowImplementationTypes(YourWorkflowImpl.class);

    // Register Activity implementation instance(s)
    worker.registerActivitiesImplementations(new YourActivitiesImpl());

    factory.start();
    }
}

.NET

Add Activity classes to the Worker options via AddActivity.

using Temporalio.Client;
using Temporalio.Worker;
// Assume MyActivities and YourWorkflow exist

var client = await TemporalClient.ConnectAsync(new("localhost:7233"));

var workerOptions = new TemporalWorkerOptions("my-task-queue")
    .AddWorkflow<YourWorkflow>()
    .AddActivity(MyActivities.ComposeGreetingAsync) // Register specific static method
    .AddActivity(MyActivities.DoStaticWork)
    .AddActivities(new MyActivities()); // Register instance methods if needed

var worker = new TemporalWorker(client, workerOptions);

try {
    await worker.ExecuteAsync(CancellationToken.None); // Use cancellation token
} catch (Exception ex) {
    // Handle exception
}

PHP

Register Activity implementation instances via registerActivityImplementations.

<?php
use Temporal\\\\WorkerFactory;\nuse App\\\\YourWorkflow; // Assuming Workflow implementation\nuse App\\\\YourActivity; // Assuming Activity implementation\nuse Psr\\\\Log\\\\AbstractLogger; // For example logger\n\n// Simple logger implementation for the example\nclass SimpleLogger extends AbstractLogger {\n    public function log($level, $message, array $context = []): void { // Add void return type\n        echo \"LOG: $message\" . PHP_EOL; // Use PHP_EOL for line endings\n    }\n}\n\n$factory = WorkerFactory::create();\n$worker = $factory->newWorker(\'my-task-queue\');\n\n// Register Workflow implementation class\n$worker->registerWorkflowTypes(YourWorkflow::class);\n\n// Register Activity implementation instance(s)\n// Dependencies (like logger) can be injected here\n$worker->registerActivityImplementations(new YourActivity(new SimpleLogger()));\n\n$factory->run();
?>

Activity Heartbeating

For long-running Activities, Heartbeating allows the Activity to report its progress and liveness back to the Temporal Cluster. This serves several purposes:

  • Liveness Detection: If the Cluster doesn't receive a heartbeat within the configured HeartbeatTimeout, it assumes the Activity Worker has crashed and reschedules the Activity (respecting the Retry Policy).
  • Progress Checkpointing: Activities can include details (payload) with the heartbeat, effectively checkpointing their progress. If the Activity is retried (on the same or different Worker), it can retrieve the last checkpointed details from the Activity context and resume from that point, avoiding redundant work.
  • Cancellation Check: The heartbeat mechanism also allows the Activity to check if a cancellation has been requested for the Workflow or the Activity itself. The Activity can then perform cleanup and exit gracefully.

Heartbeating is crucial for Activities that might run longer than the StartToCloseTimeout or for tasks where reporting progress is beneficial. It's implemented by calling a specific SDK function (for example, activity.RecordHeartbeat(ctx, details...)) periodically within the Activity code.

# Pseudocode for Activity with Heartbeating
@activity.defn
async def long_running_task(input_data):
    progress = load_progress_from_details(activity.info().heartbeat_details)
    if progress is None:
        progress = 0

    total_items = len(input_data)
    for i in range(progress, total_items):
        # Check for cancellation before/during work
        activity.heartbeat(i) # Report progress (item 'i' is about to be processed)

        # Process item i...
        await asyncio.sleep(1) # Simulate work

        # Optional: Heartbeat again after processing item, if fine-grained needed
        # activity.heartbeat(i + 1)

    return "All items processed"

def load_progress_from_details(details):
    # Logic to interpret details sent with previous heartbeat
    if details: return details # Assuming details itself is the progress marker
    return None

Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.

Complete Activities Asynchronously

In some scenarios, the process that completes an Activity might be different from the one that started it. For example, an Activity might trigger an external system (via API call) and rely on a webhook callback to signal completion.

To handle this:

  1. The Activity obtains its unique TaskToken from the Activity Context. The Task Token identifies this specific Activity execution attempt.
  2. The Activity returns control to the Worker *without* completing, typically by throwing/returning a special indicator (like activity.DoNotCompleteOnReturn or a specific exception type).
  3. The Activity passes the TaskToken to the external system or process that will eventually signal completion.
  4. When the external process finishes, it uses a Temporal Client method (like CompleteActivityById or CompleteActivity, providing the Task Token) to report the result or failure back to the Temporal Cluster.

The Worker that originally executed the Activity does not need to be running for the completion call to succeed. The Temporal Cluster uses the Task Token to correlate the completion call with the correct Workflow Execution.

# Pseudocode for Activity triggering async completion

@activity.defn
async def trigger_external_job(job_params):
    info = activity.info()
    task_token_str = info.task_token.decode('utf-8') # Get task token as string

    # Call external system, passing task_token_str as a callback identifier
    external_api.start_job(job_params, callback_token=task_token_str)

    # Tell the Worker not to complete this Activity yet
    activity.do_not_complete_on_return()


# --- Elsewhere, in a separate process (e.g., webhook handler) ---

def handle_external_job_completion(callback_token, result, error):
    client = connect_to_temporal() # Connect with a Temporal client
    task_token_bytes = callback_token.encode('utf-8')

    try:
        if error:
            client.report_activity_failure(task_token_bytes, error)
        else:
            client.report_activity_completion(task_token_bytes, result)
    except Exception as e:
        # Handle potential errors reporting completion (e.g., activity timed out already)
        log.error(f"Failed to report activity completion/failure for token {callback_token}: {e}")

Activity Cancellation

Activities can be cancelled in two main ways:

  1. Workflow Cancellation: If the Workflow Execution that invoked the Activity is cancelled, the Temporal Cluster propagates the cancellation request to running Activities (depending on the Workflow's cancellation handling logic and SDK Cancellation Scopes).
  2. Activity Timeouts: If an Activity exceeds its StartToCloseTimeout or ScheduleToCloseTimeout, it is effectively cancelled by the timeout mechanism, and a failure is reported to the Workflow (which may trigger retries).

Activities that need to perform cleanup when cancelled (for example, releasing a lock, rolling back a transaction) should check for cancellation requests. This is often done:

  • Implicitly during activity.RecordHeartbeat calls.
  • Explicitly using SDK context cancellation checks (for example, ctx.Err() in Go, checking Activity.Current.CancellationToken.IsCancellationRequested in .NET, checking activity.is_cancelled() in Python).

Upon detecting cancellation, the Activity should perform its cleanup and then typically throw a specific cancelled exception type provided by the SDK.

Temporal Client

The Temporal Client is the interface used by your application code (that isn't part of a Workflow or Activity) to interact with the Temporal Cluster. It allows you to start, signal, query, and manage Workflow Executions.

Client Connection

Establish a connection to the Temporal Cluster, specifying the target address (host:port) of the Frontend Service. Options usually include configuring TLS, identity, namespace, and connection timeouts. It's recommended to create a single Client instance per application process and reuse it.

# Pseudocode for client connection
options = {
  target_host: "your-temporal-frontend.example.com:7233",
  namespace: "your-namespace", # Defaults typically to 'default'
  # tls_options: { ... } # Optional TLS config
}
client = TemporalClient.connect(options)

# Use the client instance throughout your application...

# Close connection when application exits
client.close()

Start a Workflow Execution

Use the Client to start a new Workflow Execution. Key parameters include:

  • Workflow Type: The name or type identifying the Workflow definition registered with Workers.
  • Task Queue: The Task Queue name that Workers processing this Workflow are listening on.
  • Workflow ID: A unique business-level identifier for this specific execution (for example, order ID, user ID). Reusing a Workflow ID with a different run attempt results in a "Workflow execution already started" error unless specific policies are set.
  • Arguments: The input arguments required by the Workflow function/method.
  • Options: Workflow Execution Timeout, Workflow Run Timeout, Retry Policies, Memo, Search Attributes, etc.

TypeScript

import { Client } from '@temporalio/client';
import { exampleWorkflow } from './workflows'; // Assuming workflow function

async function runClient() { // Renamed to avoid conflict if run() is used elsewhere
    const client = new Client(); // Connects to localhost:7233 by default

    const handle = await client.workflow.start(exampleWorkflow, {
        taskQueue: 'my-task-queue',
        args: ['Temporal'], // Arguments for exampleWorkflow
        workflowId: 'workflow-' + Date.now(), // Unique ID
        // Optional: workflowExecutionTimeout: '5 minutes',
    });

    console.log(`Started workflow ${handle.workflowId}`);
    // const result = await handle.result(); // Wait for completion
}

Python

import asyncio
import uuid # For generating unique IDs
from temporalio.client import Client
from your_workflows import GreetingWorkflow # Assuming @workflow.defn class

async def start_workflow_client(): # Renamed for clarity
    client = await Client.connect("localhost:7233")

    handle = await client.start_workflow(
        GreetingWorkflow.run, # Reference the @workflow.run method
        "Temporal", # First arg for run method
        id="workflow-" + str(uuid.uuid4()), # Use UUID for uniqueness
        task_queue="my-task-queue",
        # Optional: execution_timeout=timedelta(minutes=5)
    )

    print(f"Started workflow {handle.id}")
    # result = await handle.result()

Go

package main // Assuming client code is in main package

import (
"context"
"log"
"time" // For Time
"github.com/pborman/uuid" // For UUIDs
"go.temporal.io/sdk/client"
"your_module/app" // Assuming app.YourWorkflow exists
)

func main() {
    c, err := client.Dial(client.Options{})
    if err != nil { log.Fatalln("Unable to create client", err) }
    defer c.Close()

    options := client.StartWorkflowOptions{
        ID:        "workflow-" + uuid.New(), // Use UUID for uniqueness
        TaskQueue: "my-task-queue",
        // WorkflowExecutionTimeout: 5 * time.Minute,
    }

    we, err := c.ExecuteWorkflow(context.Background(), options, app.YourWorkflow, "Temporal")
    if err != nil { log.Fatalln("Unable to execute workflow", err) }

    log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

    // var result string
    // err = we.Get(context.Background(), &result) // Wait for completion
}

Java

package your.application; // Use your actual package name

import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.UUID; // For UUIDs
import your.application.workflows.YourWorkflow; // Assuming YourWorkflow interface exists

public class Starter {
    public static void main(String[] args) {
        WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
        WorkflowClient client = WorkflowClient.newInstance(service);

        WorkflowOptions options = WorkflowOptions.newBuilder()
            .setTaskQueue("my-task-queue")
            .setWorkflowId("workflow-" + UUID.randomUUID().toString()) // Use UUID
            // .setWorkflowExecutionTimeout(Duration.ofMinutes(5))
            .build();

        // Create a Workflow stub targeting the interface
        YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, options);

        // Start execution asynchronously
        WorkflowExecution execution = WorkflowClient.start(workflow::execute, "Temporal");
        // The above is equivalent to:
        // CompletableFuture result = WorkflowClient.execute(workflow::execute, "Temporal");


        System.out.println("Started workflow " + execution.getWorkflowId());

        // To wait for result (if using start):
        // String result = workflow.execute("Temporal"); // This blocks until completion
        // Or if using execute, use the CompletableFuture
    }
}

.NET

using Temporalio.Client;
using System;
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourApplication.Workflows.YourWorkflow exists

public class WorkflowStarter
{
    public static async Task StartWorkflowAsync() // Example method
    {
        var client = await TemporalClient.ConnectAsync(new("localhost:7233"));

        // Start using the Workflow class and run method signature
        WorkflowHandle<YourWorkflow, string> handle = await client.StartWorkflowAsync(
            (YourWorkflow wf) => wf.RunAsync("Temporal"), // Lambda targeting the run method
            new(id: "workflow-" + Guid.NewGuid(), taskQueue: "my-task-queue") // Use Guid
            {
                // ExecutionTimeout = TimeSpan.FromMinutes(5)
            });

        Console.WriteLine($"Started workflow {handle.Id}");

        // string result = await handle.GetResultAsync(); // Wait for completion
    }
}

PHP

<?php declare(strict_types=1);\n\nnamespace App\\Client; // Use appropriate namespace\n\nuse Temporal\\Client\\WorkflowClientInterface;\nuse Temporal\\Client\\WorkflowOptions;\nuse App\\Workflow\\YourWorkflowInterface; // Assuming Workflow interface\nuse Ramsey\\Uuid\\Uuid; // Example: Use ramsey/uuid for UUIDs\n\n// Assume $workflowClient is initialized (e.g., via dependency injection)\n/** @var WorkflowClientInterface $workflowClient */\n\n$workflow = $workflowClient->newWorkflowStub(\n    YourWorkflowInterface::class,\n    WorkflowOptions::new()\n        ->withTaskQueue(\'my-task-queue\')\n        ->withWorkflowId(\'workflow-\' + Uuid::uuid4()->toString()) // Generate UUID\n    // ->withWorkflowExecutionTimeout(CarbonInterval::minutes(5))\n);\n\n$run = $workflowClient->start($workflow, \'Temporal\'); // Pass args for execute method\n\necho \"Started workflow \" . $run->getExecution()->id . \"\\n\";\n\n// $result = $run->getResult(); // Wait for completion\n?>

Get the Result of a Workflow Execution

After starting a Workflow, you can obtain a handle or stub representing that specific execution. You can use this handle to wait for the Workflow to complete and retrieve its return value or exception. This is typically a blocking call.

# Pseudocode for getting result
client = ...
workflow_handle = client.start_workflow(...)

print(f"Waiting for workflow {workflow_handle.id} to complete...")
try:
  # This blocks until the workflow finishes (completes, fails, times out, etc.)
  result = workflow_handle.get_result(timeout=300) # Optional timeout for the wait itself
  print(f"Workflow completed successfully with result: {result}")
except WorkflowFailedError as e:
  print(f"Workflow failed: {e.cause}")
except WorkflowTimedOutError:
  print("Workflow timed out")
# ... other potential exceptions like cancellation

Signal Workflows

Signals provide a way to send data asynchronously into a running Workflow Execution. Workflows define Signal handlers to process these incoming signals. Signals are durable and guaranteed to be delivered (at least once). They are often used for updates, notifications, or external events.

Use the Client to send a signal:

  • Identify the target Workflow Execution (by Workflow ID).
  • Specify the Signal name (defined in the Workflow).
  • Provide the Signal arguments.

You can signal a Workflow even if you don't know if it's running yet using `signalWithStart`, which will start the Workflow if it doesn't exist and then immediately deliver the signal.

TypeScript

import { Client } from '@temporalio/client';
import { exampleWorkflow } from './workflows'; // Assuming workflow function with signals
// Assume workflow defines a signal handler named 'updateGreeting'

async function signalClient() { // Renamed for clarity
    const client = new Client();
    const handle = client.workflow.getHandle('workflow-id-123'); // Get handle by ID

    // Send signal
    await handle.signal('updateGreeting', 'Hi');
    console.log('Signal sent');

    // Signal with start: Starts if not running, otherwise signals.
    // const handle = await client.workflow.signalWithStart(exampleWorkflow, {
    //   taskQueue: 'my-task-queue',
    //   workflowId: 'unique-workflow-id',
    //   args: ['Initial Name'],
    //   signal: 'updateGreeting',
    //   signalArgs: ['Hi'],
    // });
}

Python

import asyncio
from temporalio.client import Client
# Assume workflow defines @workflow.signal method 'update_greeting'

async def signal_workflow_client(): # Renamed for clarity
    client = await Client.connect("localhost:7233")
    handle = client.get_workflow_handle("workflow-id-123")

    # Send signal
    await handle.signal("update_greeting", "Hi")
    print("Signal sent")

    # Signal with start
    # handle = await client.signal_with_start_workflow(
    #     GreetingWorkflow.run, "Initial Name", # Workflow + args
    #     id="unique-workflow-id", task_queue="my-task-queue",
    #     signal="update_greeting", signal_args=["Hi"] # Signal + args
    # )

Go

package main

import (
"context"
"log"
"go.temporal.io/sdk/client"
// Assume your workflow defines a signal named "UpdateGreeting"
// import "your_module/app"
)

func main() {
    c, _ := client.Dial(client.Options{})
    defer c.Close()

    workflowID := "workflow-id-123"
    signalName := "UpdateGreeting" // Matches name in Workflow definition
    signalArg := "Hi"

    // Send signal
    err := c.SignalWorkflow(context.Background(), workflowID, "", signalName, signalArg)
    if err != nil { log.Println("Error signaling workflow", err) } else { log.Println("Signal sent") }

    // Signal with start
    // workflowRun, err := c.SignalWithStartWorkflow(context.Background(),
    //    "unique-workflow-id", "my-task-queue",
    //    signalName, signalArg, // Signal and args
    //    client.StartWorkflowOptions{...}, // Workflow options
    //    app.YourWorkflow, "Initial Name", // Workflow function and args
    // )
}

Java

package your.application;

import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import your.application.workflows.YourWorkflow; // Assume YourWorkflow interface defines a @SignalMethod void updateGreeting(String greeting);

public class Signaler {
    public static void main(String[] args) {
        WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); // Example connection
        WorkflowClient client = WorkflowClient.newInstance(service);
        String workflowId = "workflow-id-123";

        // Create a stub for signaling, only needs the Workflow ID
        YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, workflowId);

        // Send signal (non-blocking)
        workflow.updateGreeting("Hi");
        System.out.println("Signal sent");

        // Signal with start: Use WorkflowStub options
        // WorkflowOptions startOptions = WorkflowOptions.newBuilder()
        //     .setTaskQueue("my-task-queue")
        //     .setWorkflowId("unique-workflow-id")
        //     .build();
        // YourWorkflow signalWithStartStub = client.newWorkflowStub(YourWorkflow.class, startOptions);
        // BatchRequest request = client.newSignalWithStartRequest();
        // request.add(signalWithStartStub::execute, "Initial Name"); // Add start
        // request.add(signalWithStartStub::updateGreeting, "Hi"); // Add signal
        // client.signalWithStart(request);
    }
}

.NET

using Temporalio.Client;
using System; // For Console
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourWorkflow defines [WorkflowSignal] void UpdateGreeting(string greeting);

public class WorkflowSignaler
{
    public static async Task SignalWorkflowAsync() // Example method
    {
        var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
        var handle = client.GetWorkflowHandle<YourWorkflow>("workflow-id-123"); // Specify workflow type if using lambda

        // Send signal via lambda targeting the signal method
        await handle.SignalAsync((YourWorkflow wf) => wf.UpdateGreeting("Hi"));
        Console.WriteLine("Signal sent");
        // ... existing code ...
    }
}

PHP

<?php declare(strict_types=1);\\\\n\\\\nnamespace App\\\\\\\\Client;\\\\n\\\\nuse Temporal\\\\\\\\Client\\\\\\\\WorkflowClientInterface;\\\\nuse App\\\\\\\\Workflow\\\\\\\\YourWorkflowInterface; // Assume YourWorkflowInterface defines a signal method updateGreeting(string $greeting)\\\\n\\\\n/** @var WorkflowClientInterface $workflowClient */\\\\n\\\\n// Use newWorkflowStub to get a stub targeting a specific Workflow ID\\\\n$workflowStub = $workflowClient->newWorkflowStub(YourWorkflowInterface::class, \\\\\"workflow-id-123\\\\\");\\\\n// Send the signal (this example requires the workflow to be running)
$workflowStub->updateGreeting(\\\"Hi From PHP Client!\\\");

echo \\\\\"Signal sent to workflow \\\\\" . $workflowStub->getWorkflowId() . \\\\\"\\\\n\\\\\";

?>

Query Workflows

Queries provide a synchronous way to fetch state from a running Workflow Execution without affecting its history. Workflows define Query handlers to respond to these requests. Queries are strongly consistent by default, meaning they reflect the state of the Workflow just before the query is processed.

Use the Client to query a Workflow:

  • Identify the target Workflow Execution (by Workflow ID).
  • Specify the Query name (defined in the Workflow).
  • Provide any Query arguments.
  • Receive the Query result directly.

TypeScript

import { Client } from '@temporalio/client';
import { exampleWorkflow } from './workflows'; // Assuming workflow function with signals
// Assume workflow defines a query handler named 'getGreeting'

async function queryClient() { // Renamed for clarity
    const client = new Client();
    const handle = client.workflow.getHandle('workflow-id-123');

    // Send query
    const result = await handle.query('getGreeting');
    console.log(`Query result: ${result}`);
}

Python

import asyncio
from temporalio.client import Client
# Assume workflow defines @workflow.query method 'get_greeting'

async def query_workflow_client(): # Renamed for clarity
    client = await Client.connect("localhost:7233")
    handle = client.get_workflow_handle("workflow-id-123")

    # Send query
    result = await handle.query("get_greeting")
    print(f"Query result: {result}")

Go

package main

import (
"context"
"log"
"go.temporal.io/sdk/client"
// Assume your workflow defines a query named "GetGreeting"
)

func main() {
    c, _ := client.Dial(client.Options{})
    defer c.Close()

    workflowID := "workflow-id-123"
    queryName := "GetGreeting" // Matches name in Workflow definition

    // Send query
    resp, err := c.QueryWorkflow(context.Background(), workflowID, "", queryName)
    if err != nil {
        log.Println("Error querying workflow", err)
    } else {
        var result string
        if err := resp.Get(&result); err != nil {
            log.Println("Error getting query result", err)
        } else {
            log.Println("Query result:", result)
        }
    }

    // Strongly Consistent Query (default): Waits for WF to process query
    // Eventually Consistent Query: May return slightly stale data from cache faster
    // resp, err := c.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
    //    WorkflowID: workflowID,
    //    QueryType: queryName,
    //    QueryConsistencyLevel: enums.QUERY_CONSISTENCY_LEVEL_EVENTUAL,
    // })
}

Java

package your.application;

import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import your.application.workflows.YourWorkflow; // Assume YourWorkflow interface defines @QueryMethod String getGreeting();

public class Querier {
    public static void main(String[] args) {
        WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); // Example connection
        WorkflowClient client = WorkflowClient.newInstance(service);
        String workflowId = "workflow-id-123";

        // Create a stub for querying
        YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, workflowId);

        // Send query (blocking)
        try {
            String result = workflow.getGreeting();
            System.out.println("Query result: " + result);
        } catch (WorkflowQueryException e) {
            System.err.println("Error querying workflow: " + e.getMessage());
        }
    }
}

.NET

using Temporalio.Client;
using System; // For Console
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourWorkflow defines [WorkflowQuery] string GetGreeting();

public class WorkflowQuerier
{
    public static async Task QueryWorkflowAsync() // Example method
    {
        var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
        var handle = client.GetWorkflowHandle<YourWorkflow>("workflow-id-123"); // Specify workflow type for lambda

        // Send query via lambda targeting the query method
        string result = await handle.QueryAsync((YourWorkflow wf) => wf.GetGreeting());
        Console.WriteLine($"Query result: {result}");
        // ... existing code ...
    }
}

PHP

<?php declare(strict_types=1);\\\\n\\\\nnamespace App\\\\\\\\Client;\\\\n\\\\nuse Temporal\\\\\\\\Client\\\\\\\\WorkflowClientInterface;\\\\nuse App\\\\\\\\Workflow\\\\\\\\YourWorkflowInterface; // Assume YourWorkflowInterface defines a query method getGreeting()\\\\n\\\\n/** @var WorkflowClientInterface $workflowClient */\\\\n\\\\n$workflowStub = $workflowClient->newWorkflowStub(YourWorkflowInterface::class, \\\\\"workflow-id-123\\\\\");\\\\n
// Send the query (blocking)
$result = $workflowStub->getGreeting();

echo \\\\\"Query result: \\\\\" . $result . \\\\\"\\\\n\\\\\";

?>

Terminate Workflows

Terminating a Workflow Execution stops it immediately and forcefully. It will not execute any further logic or cleanup activities. Use termination with caution, typically for administrative purposes or when a Workflow is known to be in an unrecoverable state. Signals or Cancellations are generally preferred for graceful shutdown.

# Pseudocode for termination
client = ...
workflow_handle = client.get_workflow_handle("workflow-id-to-terminate")

try:
  await workflow_handle.terminate(reason="Admin cleanup")
  print("Workflow terminated.")
except Exception as e:
  print(f"Failed to terminate workflow: {e}") # e.g., workflow already completed

Cancel Workflows

Cancellation is a cooperative mechanism. Requesting cancellation sends a cancellation request to the Workflow Execution. The Workflow code must explicitly check for cancellation requests (usually via SDK mechanisms like Cancellation Scopes or checking context) and decide how to handle them (for example, perform cleanup Activities, then exit gracefully). If the Workflow ignores the request, it continues running.

# Pseudocode for cancellation request
client = ...
workflow_handle = client.get_workflow_handle("workflow-id-to-cancel")

try:
  await workflow_handle.cancel()
  print("Cancellation requested.")
  # Note: Workflow might take time to actually handle the cancellation
except Exception as e:
  print(f"Failed to request cancellation: {e}")

Update Workflows

Updates provide a way to synchronously mutate the state of a running Workflow Execution and receive a result. Unlike Signals (which are fire-and-forget), the Client sending an Update waits for confirmation that the Update has been received and validated by the Workflow, and can optionally wait for the Update handler logic within the Workflow to complete and return a value. Updates require a running Worker to process.

Workflows define Update handlers (and optional validators) to process these requests. Updates are useful for operations that need to modify Workflow state and confirm the result of that modification, such as incrementing a counter or changing a status, while ensuring the operation doesn't proceed if validation fails.

Sending an Update

Use the Client to send an Update:

  • Identify the target Workflow Execution (by Workflow ID).
  • Specify the Update name (defined in the Workflow).
  • Provide the Update arguments.
  • Wait for the Update to be accepted/rejected, and optionally wait for the result.

TypeScript

import { Client } from '@temporalio/client';
import { setLanguage } from './workflows'; // Assuming Update definition

async function updateClient() {
    const client = new Client();
    const handle = client.workflow.getHandle('workflow-id-123');

    // Execute update and wait for completion
    const previousLanguage = await handle.executeUpdate(setLanguage, {
        args: [Language.SPANISH], 
    });
    console.log(`Update completed, previous language: ${previousLanguage}`);

    // Or, start update and get handle to wait for acceptance
    // const updateHandle = await handle.startUpdate(setLanguage, {
    //    args: [Language.FRENCH],
    //    waitForStage: WorkflowUpdateStage.ACCEPTED,
    // });
    // console.log(`Update ${updateHandle.updateId} accepted.`);
    // const result = await updateHandle.result(); // Wait for completion later
}

Python

import asyncio
from temporalio.client import Client
# Assume workflow defines @workflow.update method 'set_language'

async def update_workflow_client():
    client = await Client.connect("localhost:7233")
    handle = client.get_workflow_handle("workflow-id-123")

    # Execute update and wait for completion
    previous_language = await handle.execute_update("set_language", "spanish")
    print(f"Update completed, previous language: {previous_language}")

    # Or, start update and get handle to wait for acceptance
    # update_handle = await handle.start_update("set_language", "french", wait_for_acceptance=True)
    # print(f"Update {update_handle.id} accepted.")
    # result = await update_handle.result() # Wait for completion later

Go

package main

import (
"context"
"log"
"time"
"go.temporal.io/sdk/client"
"your_module/app" // Assuming app defines SetLanguageUpdate string constant
)

func main() {
    c, _ := client.Dial(client.Options{})
    defer c.Close()

    workflowID := "workflow-id-123"
    
    // Execute update and wait for completion
    var previousLang string
    updateValue, err := c.UpdateWorkflow(context.Background(), workflowID, "", app.SetLanguageUpdate, "spanish")
    if err != nil {
        log.Fatalln("Unable to execute update", err)
    }
    err = updateValue.Get(&previousLang)
    if err != nil {
        log.Fatalln("Unable to get update result", err)
    }
    log.Println("Update completed, previous language:", previousLang)

    // Or, start update and get handle to wait for acceptance
    // updateHandle, err := c.UpdateWorkflow(context.Background(), client.UpdateWorkflowOptions{
    //	WorkflowID:   workflowID,
    //	UpdateName:   app.SetLanguageUpdate,
    //	WaitForStage: client.WorkflowUpdateStageAccepted,
    //	Args:         []interface{}{"french"},
    // })
    // if err != nil { log.Fatalln("Unable to start update", err) }
    // log.Println("Update accepted:", updateHandle.UpdateID())
    // var result string
    // err = updateHandle.Get(context.Background(), &result) // Wait for completion later
}

Java

package your.application;

import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import your.application.workflows.YourWorkflow; // Assume defines setLanguage(Language lang)
import your.application.workflows.Language; 

public class Updater {
    public static void main(String[] args) {
        WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); 
        WorkflowClient client = WorkflowClient.newInstance(service);
        String workflowId = "workflow-id-123";
        YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, workflowId);

        // Execute update and wait for completion
        try {
            Language previousLanguage = workflow.setLanguage(Language.SPANISH);
            System.out.println("Update completed, previous language: " + previousLanguage);
        } catch (WorkflowUpdateException e) {
            System.err.println("Update failed: " + e.getMessage());
        }

        // Or, start update and get handle to wait for acceptance
        // WorkflowUpdateHandle handle =
        //    WorkflowStub.fromTyped(workflow)
        //        .startUpdate(
        //            "setLanguage", WorkflowUpdateStage.ACCEPTED, Language.class, Language.FRENCH);
        // System.out.println("Update " + handle.getId() + " accepted.");
        // Language result = handle.getResultAsync().get(); // Wait for completion later
    }
}

.NET

using Temporalio.Client;
using System;
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourWorkflow defines Task SetCurrentLanguageAsync(Language lang)

public class WorkflowUpdater
{
    public static async Task UpdateWorkflowAsync()
    {
        var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
        var handle = client.GetWorkflowHandle("workflow-id-123"); 

        // Execute update and wait for completion
        var previousLanguage = await handle.ExecuteUpdateAsync(
            wf => wf.SetCurrentLanguageAsync(Language.Spanish));
        Console.WriteLine($"Update completed, previous language: {previousLanguage}");

        // Or, start update and get handle to wait for acceptance
        // var updateHandle = await handle.StartUpdateAsync(
        //    wf => wf.SetCurrentLanguageAsync(Language.French),
        //    new(waitForStage: WorkflowUpdateStage.Accepted));
        // Console.WriteLine($"Update {updateHandle.Id} accepted.");
        // var result = await updateHandle.GetResultAsync(); // Wait for completion later
    }
}

PHP

<?php declare(strict_types=1);
namespace App\Client;

use Temporal\Client\WorkflowClientInterface;
use App\Workflow\YourWorkflowInterface; // Assume defines setLanguage(string $lang): string

/** @var WorkflowClientInterface $workflowClient */
$workflowStub = $workflowClient->newRunningWorkflowStub(YourWorkflowInterface::class, "workflow-id-123");

// Execute update and wait for completion
try {
    $previousLanguage = $workflowStub->setLanguage('Spanish');
    echo "Update completed, previous language: " . $previousLanguage . "\n";
} catch (\Throwable $e) {
    echo "Update failed: " . $e->getMessage() . "\n";
}

// Or, start update and get handle to wait for acceptance
// use Temporal\Client\Update\LifecycleStage;
// $handle = $workflowStub->startUpdate('setLanguage', 'French'); // Default stage is ACCEPTED
// echo "Update " . $handle->getId() . " accepted.\n";
// $result = $handle->getResult(); // Wait for completion later
?>

Update-with-Start

Similar to Signal-with-Start, Update-with-Start allows a Client to send an Update to a Workflow, starting the Workflow Execution if it's not already running. If the Workflow exists, the Update is processed normally. If it doesn't exist, a new Workflow Execution is started with the given ID, and the Update is delivered and processed *before* the main Workflow logic begins execution.

Note: Update-with-Start is in Public Preview (as of Server v1.26+) and requires specific SDK Client methods.

TypeScript

import { Client, WorkflowIdConflictPolicy } from '@temporalio/client';
import { yourWorkflow, setLanguage } from './workflows'; // Assuming Update definition

async function updateWithStartClient() {
    const client = new Client();
    const workflowId = 'uws-workflow-' + Date.now();

    // Execute update-with-start and wait for completion
    const handle = await client.workflow.executeUpdateWithStart(
        yourWorkflow, // Workflow function
        {
            taskQueue: 'my-task-queue',
            workflowId: workflowId,
            workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
            args: [/* initial workflow args */],
        },
        setLanguage, // Update definition
        [Language.ENGLISH] // Update args
    );
    console.log(`UpdateWithStart completed for ${handle.workflowId}, result: ${await handle.result()}`);

    // Or, start update-with-start and get handle
    // const updateHandle = await client.workflow.startUpdateWithStart(
    //     yourWorkflow, { /* options */ }, setLanguage, [Language.FRENCH]
    // );
    // const result = await updateHandle.result();
}

Python

Use execute_update_with_start_workflow or start_update_with_start_workflow on the Client.

import asyncio
from temporalio.client import Client, WorkflowUpdateFailedError
from temporalio.common import WorkflowIDConflictPolicy
from temporalio.client import WithStartWorkflowOperation
# Assuming YourWorkflow with run() method and update handler 'add_item'
# from your_workflow import YourWorkflow, ShoppingCartItem

async def update_with_start_workflow_py():
    client = await Client.connect("localhost:7233")
    workflow_id = "uws-workflow-py"
    task_queue = "my-task-queue"
    item_id = "product-123"
    quantity = 1
    
    # Define the operation to start the workflow if it doesn't exist
    start_op = WithStartWorkflowOperation(
        "YourWorkflow.run", # Reference to the workflow run method (or string name)
        # Initial args for the workflow if started
        id=workflow_id,
        task_queue=task_queue,
        id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
    )

    try:
        # Execute update-with-start and wait for completion
        update_result = await client.execute_update_with_start_workflow(
            "YourWorkflow.add_item", # Reference to update handler (or string name)
            ShoppingCartItem(sku=item_id, quantity=quantity), # Update args
            start_workflow_operation=start_op,
        )
        print(f"UpdateWithStart completed, result: {update_result}")

        # Or, start update-with-start and get handle
        # update_handle = await client.start_update_with_start_workflow(
        #     "YourWorkflow.add_item", 
        #     ShoppingCartItem(sku=item_id, quantity=quantity), 
        #     start_workflow_operation=start_op,
        #     wait_for_stage=client.WorkflowUpdateStage.ACCEPTED
        # )
        # print(f"UpdateWithStart {update_handle.id} accepted.")
        # final_result = await update_handle.result()

    except WorkflowUpdateFailedError as e:
        print(f"UpdateWithStart failed: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")

# Placeholder class definitions for the example
class ShoppingCartItem:
    def __init__(self, sku, quantity):
        self.sku = sku
        self.quantity = quantity

# Example usage:
# asyncio.run(update_with_start_workflow_py())

Go

package main

import (
"context"
"log"
"time"
"go.temporal.io/sdk/client"
"go.temporal.io/api/enums/v1"
"your_module/app" // Assume app defines SetLanguageUpdate & MyWorkflow
)

func main() {
    c, _ := client.Dial(client.Options{})
    defer c.Close()

    workflowID := "uws-workflow-go"
    taskQueue := "my-task-queue"

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    startOpts := client.StartWorkflowOptions{
        ID:                       workflowID,
        TaskQueue:                taskQueue,
        WorkflowIDConflictPolicy: enumsv1.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
    }
    updateOpts := client.UpdateWorkflowOptions{
        UpdateName:   app.SetLanguageUpdate,
        WaitForStage: client.WorkflowUpdateStageCompleted, // Wait for completion
        Args:         []interface{}{"spanish"},
    }
    
    startWorkflowOp := client.NewWithStartWorkflowOperation(startOpts, app.MyWorkflow /*, initial args... */)

    updateHandle, err := c.UpdateWithStartWorkflow(ctx, startWorkflowOp, updateOpts)
    if err != nil {
        log.Fatalln("UpdateWithStartWorkflow failed:", err)
    }

    var previousLang string
    err = updateHandle.Get(ctx, &previousLang) // Get result from handle
    if err != nil {
        log.Fatalln("Failed to get UpdateWithStart result:", err)
    }
    log.Println("UpdateWithStart completed, previous language:", previousLang)
}

Java

package your.application;

import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import your.application.workflows.YourWorkflow;
import your.application.workflows.Language;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import java.time.Duration;

public class UpdateWithStarter {
    public static void main(String[] args) {
        WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
        WorkflowClient client = WorkflowClient.newInstance(service);
        String workflowId = "uws-workflow-java";
        String taskQueue = "my-task-queue";

        WorkflowOptions startOptions = WorkflowOptions.newBuilder()
            .setTaskQueue(taskQueue)
            .setWorkflowId(workflowId)
             // Required: Specify how to handle ID conflicts
            .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY) // Or other appropriate policy
            .build();
        
        // Create a stub just for specifying the target workflow type and options
        YourWorkflow workflowStub = client.newWorkflowStub(YourWorkflow.class, startOptions);

        UpdateOptions updateOptions = UpdateOptions.newBuilder()
            .setUpdateName("setLanguage")
            .setResultClass(Language.class)
            .setWaitForStage(WorkflowUpdateStage.COMPLETED) // Wait for completion
            .setFirstExecutionRunId(null) // Cannot specify RunID for UWS
            .build();
            
        // Define the start operation (workflow method and initial args)
        WithStartWorkflowOperation startOperation =
             new WithStartWorkflowOperation<>(workflowStub::execute /*, initial args... */);

        try {
            // Execute Update-with-Start and wait for result
            Language previousLanguage = client.executeUpdateWithStart(
                workflowStub::setLanguage, // Update method reference
                updateOptions, 
                startOperation,
                Language.SPANISH // Update arguments
            );
            System.out.println("UpdateWithStart completed, previous language: " + previousLanguage);

            // Or use startUpdateWithStart to get a handle
            // WorkflowUpdateHandle handle = client.startUpdateWithStart(
            //     workflowStub::setLanguage, updateOptions, startOperation, Language.FRENCH
            // );
            // Language result = handle.getResultAsync().get();

        } catch (Exception e) {
             System.err.println("UpdateWithStart failed: " + e);
        }
    }
}

.NET

using Temporalio.Client;
using Temporalio.Enums;
using System;
using System.Threading.Tasks;
using YourApplication.Workflows;

public class UpdateWithStartClient
{
    public static async Task UpdateWithStartAsync()
    {
        var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
        string workflowId = "uws-workflow-dotnet";
        string taskQueue = "my-task-queue";

        var startArgs = new StartWorkflowArgs { /* initial workflow args */ };

        // Define the start operation (workflow run method and args)
        var startOperation = new WithStartWorkflowOperation(
            wf => wf.RunAsync(startArgs) // Reference to the WorkflowRun method
        );

        // Execute update-with-start and wait for completion
        var previousLanguage = await client.ExecuteUpdateWithStartAsync(
            workflowId, 
            taskQueue,
            (YourWorkflow wf) => wf.SetCurrentLanguageAsync(Language.Spanish), // Update lambda
            startOperation, // Contains workflow lambda and args
            new() { 
                WorkflowIdConflictPolicy = WorkflowIdConflictPolicy.UseExisting, 
                // Update options can be added here inside new WorkflowUpdateOptions()
            } 
        );
        Console.WriteLine($"UpdateWithStart completed, previous language: {previousLanguage}");

        // Or, start update-with-start and get handle
        // var updateHandle = await client.StartUpdateWithStartAsync(
        //    workflowId, taskQueue,
        //    (YourWorkflow wf) => wf.SetCurrentLanguageAsync(Language.French),
        //    startOperation,
        //    new() { WorkflowIdConflictPolicy = WorkflowIdConflictPolicy.UseExisting }
        // );
        // var result = await updateHandle.GetResultAsync();
    }
}
public class StartWorkflowArgs { /* Define args if needed */ }

PHP

PHP SDK does not currently have a dedicated Update-with-Start API. A similar pattern to the Python example (check existence first) would be needed.

<?php
// Conceptual Example - Check existence first
use Temporal\Client\WorkflowClientInterface;
use Temporal\Client\WorkflowStubInterface;
use Temporal\Exception\Client\WorkflowNotFoundException;

/** @var WorkflowClientInterface $workflowClient */
$workflowId = 'uws-workflow-php';
$taskQueue = 'my-task-queue';
$updateArg = 'Spanish';
$startArgs = [/* initial args */];

try {
    // Try to get a stub for a running workflow
    $workflowStub = $workflowClient->newRunningWorkflowStub(YourWorkflowInterface::class, $workflowId);
    echo "Workflow {$workflowId} exists. Sending update.\n";
    $result = $workflowStub->setLanguage($updateArg);
    echo "Update result: {$result}\n";
} catch (WorkflowNotFoundException $e) {
    echo "Workflow {$workflowId} not found. Starting and updating.\n";
    // Note: This doesn't guarantee atomicity like true Update-with-Start
    $workflowStub = $workflowClient->newWorkflowStub(
        YourWorkflowInterface::class,
        WorkflowOptions::new()
            ->withTaskQueue($taskQueue)
            ->withWorkflowId($workflowId)
            // Use appropriate ID reuse policy
    );
    $run = $workflowClient->start($workflowStub, ...$startArgs);
    // Potentially wait briefly for workflow to initialize
    sleep(1); 
    $result = $workflowStub->setLanguage($updateArg);
    echo "Started workflow " . $run->getExecution()->id . ", update result: {$result}\n";
}
?>

Advanced Concepts

Activity Heartbeating

For long-running Activities, Heartbeating allows the Activity to report its progress and liveness back to the Temporal Cluster. This serves several purposes:

  • Liveness Detection: If the Cluster doesn't receive a heartbeat within the configured HeartbeatTimeout, it assumes the Activity Worker has crashed and reschedules the Activity (respecting the Retry Policy).
  • Progress Checkpointing: Activities can include details (payload) with the heartbeat, effectively checkpointing their progress. If the Activity is retried (on the same or different Worker), it can retrieve the last checkpointed details from the Activity context and resume from that point, avoiding redundant work.
  • Cancellation Check: The heartbeat mechanism also allows the Activity to check if a cancellation has been requested for the Workflow or the Activity itself. The Activity can then perform cleanup and exit gracefully.

Heartbeating is crucial for Activities that might run longer than the StartToCloseTimeout or for tasks where reporting progress is beneficial. It's implemented by calling a specific SDK function (for example, activity.RecordHeartbeat(ctx, details...)) periodically within the Activity code.

# Pseudocode for Activity with Heartbeating
@activity.defn
async def long_running_task(input_data):
    progress = load_progress_from_details(activity.info().heartbeat_details)
    if progress is None:
        progress = 0

    total_items = len(input_data)
    for i in range(progress, total_items):
        # Check for cancellation before/during work
        activity.heartbeat(i) # Report progress (item 'i' is about to be processed)

        # Process item i...
        await asyncio.sleep(1) # Simulate work

        # Optional: Heartbeat again after processing item, if fine-grained needed
        # activity.heartbeat(i + 1)

    return "All items processed"

def load_progress_from_details(details):
    # Logic to interpret details sent with previous heartbeat
    if details: return details # Assuming details itself is the progress marker
    return None

Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.

Complete Activities Asynchronously

In some scenarios, the process that completes an Activity might be different from the one that started it. For example, an Activity might trigger an external system (via API call) and rely on a webhook callback to signal completion.

To handle this:

  1. The Activity obtains its unique TaskToken from the Activity Context. The Task Token identifies this specific Activity execution attempt.
  2. The Activity returns control to the Worker *without* completing, typically by throwing/returning a special indicator (like activity.DoNotCompleteOnReturn or a specific exception type).
  3. The Activity passes the TaskToken to the external system or process that will eventually signal completion.
  4. When the external process finishes, it uses a Temporal Client method (like CompleteActivityById or CompleteActivity, providing the Task Token) to report the result or failure back to the Temporal Cluster.

The Worker that originally executed the Activity does not need to be running for the completion call to succeed. The Temporal Cluster uses the Task Token to correlate the completion call with the correct Workflow Execution.

# Pseudocode for Activity triggering async completion

@activity.defn
async def trigger_external_job(job_params):
    info = activity.info()
    task_token_str = info.task_token.decode('utf-8') # Get task token as string

    # Call external system, passing task_token_str as a callback identifier
    external_api.start_job(job_params, callback_token=task_token_str)

    # Tell the Worker not to complete this Activity yet
    activity.do_not_complete_on_return()


# --- Elsewhere, in a separate process (e.g., webhook handler) ---

def handle_external_job_completion(callback_token, result, error):
    client = connect_to_temporal() # Connect with a Temporal client
    task_token_bytes = callback_token.encode('utf-8')

    try:
        if error:
            client.report_activity_failure(task_token_bytes, error)
        else:
            client.report_activity_completion(task_token_bytes, result)
    except Exception as e:
        # Handle potential errors reporting completion (e.g., activity timed out already)
        log.error(f"Failed to report activity completion/failure for token {callback_token}: {e}")

Activity Cancellation

Activities can be cancelled in two main ways:

  1. Workflow Cancellation: If the Workflow Execution that invoked the Activity is cancelled, the Temporal Cluster propagates the cancellation request to running Activities (depending on the Workflow's cancellation handling logic and SDK Cancellation Scopes).
  2. Activity Timeouts: If an Activity exceeds its StartToCloseTimeout or ScheduleToCloseTimeout, it is effectively cancelled by the timeout mechanism, and a failure is reported to the Workflow (which may trigger retries).

Activities that need to perform cleanup when cancelled (for example, releasing a lock, rolling back a transaction) should check for cancellation requests. This is often done:

  • Implicitly during activity.RecordHeartbeat calls.
  • Explicitly using SDK context cancellation checks (for example, ctx.Err() in Go, checking Activity.Current.CancellationToken.IsCancellationRequested in .NET, checking activity.is_cancelled() in Python).

Upon detecting cancellation, the Activity should perform its cleanup and then typically throw a specific cancelled exception type provided by the SDK.

Workflow Versioning (Patching)

Since Workflow code must be deterministic, deploying updated Workflow code that changes the sequence of commands (like adding a new Activity call, removing a Timer) can break replays of existing, in-flight Workflow Executions that were started with the old code.

The Workflow.getVersion API (or equivalent in each SDK) allows you to safely deploy changes to Workflow definitions without breaking compatibility.

  1. Wrap the new or modified logic within a getVersion block, assigning it a unique change ID (often an integer or descriptive string).
  2. Specify a default version (typically associated with the original code path) and the new version (associated with the changed code path).
  3. When a Worker executes this code during a replay: If the history already contains a marker for this change ID, the SDK automatically executes the code path corresponding to the recorded version.
  4. When a Worker executes this code for the *first time* (not a replay for this specific part), it executes the code associated with the *new* version you specified and records that version marker in the history.

This ensures that existing executions continue deterministically along their original path, while new executions (or executions reaching this point for the first time) use the updated logic.

# Pseudocode for using getVersion

# --- Original Code ---
# result = await workflow.execute_activity(activity_A, ...)

# --- Updated Code ---
# We want to optionally call activity_B before activity_A

# Change ID identifies this specific patch/change
patch_change_id = "add-activity-B-call-v1"

# Default version (original behavior) is -1 (or some SDK-specific default)
# New version is 1 (or the next sequential number for this change ID)
version = workflow.get_version(patch_change_id, default=-1, max_supported=1)

if version == 1:
    # New code path: Call B first
    await workflow.execute_activity(activity_B, ...)

# Code common to both versions (or the original path if version == -1)
result = await workflow.execute_activity(activity_A, ...)

After all in-flight Workflows using the old code path (default version) have completed, you can remove the getVersion call and the old code path, simplifying the Workflow definition.

Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.

Execute Child Workflows

Workflows can start other Workflows, known as Child Workflows. This allows for modular design, breaking down complex business processes into smaller, reusable, and independently testable units.

Key characteristics of Child Workflows:

  • Independent History: Each Child Workflow has its own separate event history.
  • Parent Relationship: The parent Workflow can track the status of the child and receive its result upon completion.
  • Cancellation Propagation: Cancellation can be configured to propagate from parent to child (or not).
  • Retries and Timeouts: Child Workflows have their own timeouts and retry policies, independent of the parent.
  • Task Queues: Child Workflows can run on the same or different Task Queues as the parent.

Starting a Child Workflow is similar to starting a Workflow via the Client, but it's done from within the parent Workflow's code using specific SDK functions (for example, workflow.ExecuteChildWorkflow). Options control the parent-child relationship behavior (for example, what happens if the parent completes before the child).

TypeScript

import { executeChild, proxyActivities, ParentClosePolicy, workflowInfo } from '@temporalio/workflow'; // Import workflowInfo
import { childWorkflow } from './child'; // Assume childWorkflow definition exists

export async function parentWorkflow(name: string): Promise<string> {
    // Start child and wait for result
    const result = await executeChild(childWorkflow, {
        args: [name],
        workflowId: `child-${workflowInfo().workflowId}`, // Ensure unique child ID using workflowInfo
        // Default: ParentClosePolicy.TERMINATE (terminates child if parent closes)
        // Use ABANDON if child should continue after parent finishes
        parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
    });
    return `Parent received: ${result}`;
}

Python

from temporalio import workflow
from temporalio.common import ParentClosePolicy
# Assume child_workflow is defined in another file/module
with workflow.unsafe.imports_passed_through():
    from child_workflows import ChildWorkflow

@workflow.defn
class ParentWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        result = await workflow.execute_child_workflow(
            ChildWorkflow.run,
            f"Input for {name}",
            id=f"child-{workflow.info().workflow_id}",
            parent_close_policy=ParentClosePolicy.ABANDON,
        )
        return f"Parent received: {result}"
                

Go

package app // Assuming parent workflow is in app package

import (
"fmt" // For string formatting
"go.temporal.io/sdk/workflow"
"go.temporal.io/api/enums/v1"
// Assume app defines ChildWorkflow function
"your_module/app"
)

func ParentWorkflow(ctx workflow.Context, name string) (string, error) {
    cwo := workflow.ChildWorkflowOptions{
        // Generate a unique ID, e.g., combining parent ID and a unique part
        WorkflowID: fmt.Sprintf("child-%s-%s", workflow.GetInfo(ctx).WorkflowExecution.ID, workflow.GetRandomUUID(ctx)),
        ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
    }
    ctx = workflow.WithChildOptions(ctx, cwo)

    var result string
    // Assuming ChildWorkflow is defined in the same package or imported correctly
    future := workflow.ExecuteChildWorkflow(ctx, app.ChildWorkflow, name)
    if err := future.Get(ctx, &result); err != nil {
        // Handle child workflow error (e.g., log, return error)
        workflow.GetLogger(ctx).Error("Child workflow failed", "Error", err)
        return "", err
    }
    return "Parent received: " + result, nil
}

// Assume ChildWorkflow definition exists, e.g.:
// func ChildWorkflow(ctx workflow.Context, inputName string) (string, error) {
//     workflow.GetLogger(ctx).Info("Child workflow started", "Input", inputName)
//     return "Result from child for " + inputName, nil
// }
                

Java

package your.application.workflows; // Use your actual package name

import io.temporal.workflow.*;
import io.temporal.api.enums.v1.ParentClosePolicy;
import java.util.UUID; // For UUIDs
// Assume ChildWorkflow interface exists in the same package or imported

public class ParentWorkflowImpl implements ParentWorkflow { // Assuming ParentWorkflow interface
    @Override
    public String execute(String name) {
        ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
            .setWorkflowId("child-" + Workflow.getInfo().getWorkflowId() + "-" + UUID.randomUUID().toString()) // Ensure unique ID
            .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
            .build();

        ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);

        // Start child and wait for result
        String result = child.execute("Input for " + name);
        return "Parent received: " + result;

        // To start asynchronously:
        // Promise promise = Async.function(child::execute, "Input for " + name);
        // Use promise.get() later
    }
}

.NET

using Temporalio.Workflows;
using Temporalio.Common; // For ParentClosePolicy
using System; // For Guid
using System.Threading.Tasks;
using YourApplication.Workflows; // Assuming ChildWorkflow class exists in the same namespace or imported

[Workflow]
public class ParentWorkflow
{
    [WorkflowRun]
    public async Task<string> RunAsync(string name)
    {
        string result = await Workflow.ExecuteChildWorkflowAsync(
            (ChildWorkflow wf) => wf.RunChildAsync($"Input for {name}"), // Ensure RunChildAsync exists on ChildWorkflow
            new ChildWorkflowOptions
            {
                WorkflowId = $"child-{Workflow.Info.WorkflowId}-{Guid.NewGuid()}", // Ensure unique ID
                ParentClosePolicy = ParentClosePolicy.Abandon,
            }
        );
        return $"Parent received: {result}";
    }
}

PHP

<?php declare(strict_types=1);\\\n\\\nnamespace App\\\\\\\\Workflow;\\\\n\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowInterface;\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityOptions;\\\\nuse Temporal\\\\\\\\Common\\\\\\\\RetryOptions;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityInterface;\\\\nuse App\\\\\\\\ChildWorkflows\\\\\\\\ChildWorkflow;\\\\nuse Ramsey\\\\Uuid\\\\Uuid; // Example: Use ramsey/uuid\\\\n\\\\n// Define ParentWorkflow interface and implementation\\\\ninterface ParentWorkflowInterface extends ParentWorkflowInterface<?> {\\\n    @ParentWorkflowMethod(name = "ParentWorkflow")\\\n    void execute(String name);\n}\\\n\\\\nclass ParentWorkflowImpl implements ParentWorkflowInterface {\\\n    @Override\\\n    @ParentWorkflowMethod(name = "ParentWorkflow")\\\n    public void execute(String name) {\\\n        // Start child workflow\\\n        ChildWorkflow.start(\\\"child-workflow\\\", [\\\"name\\\" => $name]);\\\\\n        // You can also use a different workflow ID if needed\\\\\n    }\\\\\n}\\\n?>

Continue-As-New

Workflow Executions have limits on their event history size and duration. For long-running processes or Workflows that accumulate a large history (for example, loops processing many items), `Continue-As-New` allows a Workflow to effectively restart itself with a fresh history, carrying over necessary state.

When a Workflow decides to `Continue-As-New`:

  1. It specifies the same Workflow Type (or potentially a different one).
  2. It provides the input arguments for the *new* execution. This is how state is carried over.
  3. The current execution is marked as `ContinuedAsNew`.
  4. A new Workflow Execution starts immediately with the provided arguments and a clean history, but logically continues the business process. The Run ID changes, but the Workflow ID can be kept the same.

This is the standard pattern for implementing infinite loops or processes that need to run indefinitely without hitting history limits. It's also useful for periodic cleanup of large state within a Workflow.

# Pseudocode for Continue-As-New loop

@workflow.defn
class PerpetualWorkflow:
    @workflow.run
    async def run(self, counter: int, other_state: dict = None): # Example with state
        if other_state is None:
            other_state = {} # Initialize state if first run

        workflow.logger.info(f"Executing run {counter} with state: {other_state}")

        # Example condition to ContinueAsNew (e.g., every 1000 iterations)
        if counter > 0 and counter % 1000 == 0:
            workflow.logger.info(f"Processed {counter} items, continuing as new...")
            # Carry over the necessary state to the new execution
            # Only include state needed for the *next* run
            workflow.continue_as_new(counter, other_state) # Restarts run() with new args
            # Note: Code after continue_as_new does not execute in the current run

        # --- Regular Workflow Logic ---
        workflow.logger.info(f"Processing item {counter}")
        # Do work... update other_state if needed
        # await workflow.execute_activity(process_item, counter, other_state, ...)
        other_state['last_processed'] = counter
        await workflow.sleep(timedelta(seconds=1)) # Simulate work or delay

        # "Loop" by calling continue_as_new with updated state for the next iteration
        workflow.continue_as_new(counter + 1, other_state)

# Client side starting the workflow for the first time:
    async def run(self, counter: int):
        if counter >= 1000: # Example condition to ContinueAsNew
            print(f"Processed {counter} items, continuing as new...")
            # Carry over the counter state to the new execution
            workflow.continue_as_new(counter) # Restarts run() with counter
        else:
            print(f"Processing item {counter}")
            # Do work...
            await workflow.execute_activity(process_item, counter, ...)

            # Schedule next iteration after a delay
            await workflow.sleep(timedelta(seconds=10))
            # "Loop" by calling continue_as_new with updated state
            workflow.continue_as_new(counter + 1)

Side Effects

While Workflows must be deterministic, sometimes you need to execute a short, non-deterministic piece of code within the Workflow logic itself (without calling an Activity). A common example is generating a UUID or getting a random number using a non-SDK function (which would violate determinism if called directly).

Workflow.SideEffect (or equivalent) allows this.

  1. You provide a function/lambda containing the non-deterministic code to SideEffect.
  2. The first time this code path is executed, the function is run, and its return value is recorded in the Workflow history.
  3. During replays, the function is *not* re-executed. Instead, the recorded value from the history is returned directly, preserving determinism.

Side Effects should only be used for short, simple operations where creating a dedicated Activity would be overkill. They are not suitable for I/O or long-running computations. For example, generating a unique ID or calling a potentially flaky function that you don't want to retry as a full Activity.

# Pseudocode for Side Effect

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self):
        # Generate a random ID using a potentially non-deterministic library function
        # This lambda is only executed once per execution path.
        random_id = await workflow.side_effect(lambda: generate_random_id_maybe_impure())

        # Use the random_id, which is now deterministic for replays
        await workflow.execute_activity(use_id_activity, random_id, ...)

        return f"Processed with ID: {random_id}"

# Note: For truly random values needed within workflow logic,
# prefer the SDK's deterministic random functions like workflow.random().
# SideEffect is more for integrating short non-deterministic external code.

Local Activities

Local Activities are an optimization for short-lived Activities that don't require the full overhead of Task Queue polling, scheduling, and retries managed by the Temporal Cluster. They execute directly within the Worker's Workflow Task processing thread.

Characteristics:

  • Execution: Run in the same process as the Workflow code that called them.
  • Latency: Much lower invocation latency compared to standard Activities (sub-millisecond).
  • Retries: Limited, server-side retries are not applicable. Retries must be handled by the Workflow code itself or rely on the Workflow Task retry mechanism. A short, configurable local retry policy might be available in the SDK.
  • Timeouts: Subject to shorter, locally configured timeouts, often tied to the Workflow Task timeout.
  • Use Cases: Ideal for simple functions like data validation, transformation, short calculations, or accessing read-only resources local to the Worker, where the overhead of a standard Activity is undesirable. For example, validating input against a schema loaded in Worker memory.
  • Determinism: Like standard Activities, Local Activities themselves do not need to be deterministic, but their *invocation* from the Workflow must be.

Executing a Local Activity uses a specific SDK function (for example, workflow.ExecuteLocalActivity), often requiring different, shorter timeout configurations than standard Activities.

Mutations

The Workflow.upsertMemo, Workflow.upsertSearchAttributes APIs allow Workflows to update their own Memo and Search Attributes during execution. These updates are recorded in the event history.

Workflow.deprecatePatch is used in conjunction with Workflow.getVersion for managing code cleanup. Once you are sure no running Workflows are using an old code path identified by a specific change ID (patch ID), you can call `deprecatePatch(patch_change_id)`. This signals to the system (and potentially linting tools) that this patch ID is obsolete and its corresponding `getVersion` block can eventually be removed from the code.

Testing

Testing Temporal applications is crucial for ensuring correctness and reliability. Temporal provides several mechanisms and recommendations for testing different aspects of your application, from individual Activities and Workflows in isolation to their integrated behavior.

Approaches to Testing

Key testing strategies include:

  • Unit Testing: Testing individual Workflow or Activity functions/methods in isolation, mocking their dependencies (like Activity calls within a Workflow, or external services called by an Activity). This focuses on the logic within the component itself.
  • Workflow Replay Testing: Verifying that Workflow code remains deterministic after changes. The SDK uses saved Workflow histories to replay the Workflow logic and ensure it produces the exact same commands. This is vital for safely deploying updates to Workflow definitions.
  • Integration Testing (Test Environment): Testing the interaction between Workflows, Activities, and Workers using a test environment provided by the SDK. This environment simulates the Temporal Cluster, allowing you to run Workflows and Activities locally, testing their interaction and behavior without needing a full Temporal Cluster. This is the most common way to test the end-to-end logic of your Temporal application.
  • End-to-End Testing: Testing the complete application flow, including client invocations, Worker execution, and interactions with real external services (though often mocked or stubbed), potentially against a real Temporal Cluster (like the development server or a staging cluster).

Workflow Replay Testing

Workflow replay testing ensures that changes to Workflow code do not break determinism for existing, in-flight Workflow Executions. Temporal SDKs typically provide tools or mechanisms to facilitate this. The process generally involves:

  1. Obtaining historical event data (Workflow History) for completed Workflow Executions (often saved as JSON files). You can download these using the Temporal CLI or SDK utilities.
  2. Running a test suite that feeds these histories into the updated Workflow code via the SDK's replay functionality.
  3. The replay mechanism executes the Workflow logic against the history. If the updated code produces a different sequence of commands (like trying to schedule a different Activity, using a different timer duration, etc.) than what is recorded in the history, the replay fails, indicating a non-deterministic change.

Regularly running replay tests against a corpus of relevant histories is a best practice before deploying Workflow code changes.

Integration Testing with the Test Environment

Most Temporal SDKs provide a test environment or framework (e.g., `TestWorkflowEnvironment` in Java/Go, `WorkflowEnvironment.start_time_skipping` in Python, `TestWorkflowRuntime` in .NET, test utilities in TypeScript) that simulates the Temporal Cluster in-process or locally. This allows you to:

  • Register Workflow and Activity implementations.
  • Execute Workflows and Activities directly within your tests.
  • Mock Activities called by Workflows.
  • Control time within the environment (time skipping) to quickly test timeouts, timers, and long-running processes without waiting in real time.
  • Assert Workflow completion status, results, or errors.
  • Verify Activity invocations and parameters.

This is the primary method for testing the integrated logic of your Workflows and Activities.