A Temporal Application is a collection of Workflow Definitions, Activity Definitions, and Worker Processes
that execute your business logic. It relies on a Temporal Service (either Temporal Cloud or a self-hosted
cluster) to manage the state and execution of Workflows reliably.
Workflows
A Workflow is the core abstraction in Temporal, representing a durable, reliable, and potentially
long-running function execution. Workflows orchestrate the execution of Activities to achieve a business
goal.
Key Properties:
Durability: Workflow state is persisted by the Temporal Service, allowing execution to
survive process failures and resume automatically.
Reliability: The Temporal Platform guarantees that Workflow code executes exactly once to
completion, handling retries and failures transparently.
Deterministic Constraints: Workflow code must be deterministic. This means it must
produce the same output given the same input, without relying on external system state or side effects
(except through Activities).
Workflows are defined as functions or methods in your chosen SDK language. They interact with the Temporal
system via SDK APIs to schedule Activities, manage timers, wait for external events (Signals), and more.
Activities
An Activity represents a single, well-defined unit of work within a Workflow, typically involving
interaction with the outside world (for example, database calls, API requests, sending emails).
Key Properties:
Non-deterministic Code: Unlike Workflows, Activity code can interact freely with external
systems and perform non-deterministic operations.
Managed Execution: Activities are executed by Workers. The Temporal Platform manages
their retries, timeouts, and tracking based on configured policies.
Idempotent Design: Activities should ideally be designed to be idempotent. This means
executing them multiple times with the same input yields the same result, which is crucial for handling
potential retries safely.
Activities are defined as functions or methods and are invoked asynchronously from Workflows.
Workers
A Worker is a process that hosts Workflow and Activity implementations. Workers poll Task Queues on the
Temporal Service, receive Tasks (representing Workflow or Activity executions), execute the corresponding
code, and report results back.
Responsibilities:
Execute Workflow logic based on its history.
Execute Activity Tasks, interacting with external systems as needed.
Communicate with the Temporal Service to fetch Tasks and report progress/results.
You can run multiple Worker processes for scalability and fault tolerance. Workers are stateless regarding
Workflow executions (state is maintained by the Cluster), but they manage Activity execution state locally
(for example, if an Activity needs to checkpoint progress).
Task Queues
A Task Queue is a lightweight, dynamic queue managed by the Temporal Service. When a Workflow Execution is
started or an Activity is scheduled, a Task is placed onto a specific Task Queue.
Purpose:
Routing: Workers listen on specific Task Queues. This directs Tasks to the appropriate
set of Workers capable of handling them.
Load Balancing: The Temporal Service distributes Tasks from a queue among the available
Workers listening on it.
Prioritization (Enterprise): Task Queues can be configured for task prioritization in
Temporal Cluster versions supporting it.
Task Queues are fundamental for decoupling Workflow/Activity scheduling from Worker execution.
Setup and Installation
Temporal CLI and Development Server
The Temporal CLI is essential for interacting with a Temporal Service (including Temporal
Cloud) and includes a lightweight development server for local testing. This server runs as a single process
with an in-memory database (or optional file persistence) and includes the Temporal Web UI.
CLI Installation
The CLI is available for macOS, Linux, and Windows.
macOS (Homebrew):brew install temporal
Linux (Homebrew):brew install temporal
Manual Download (All Platforms): Visit
temporal.io/download, select your platform/architecture,
download the archive, extract it, and add the temporal (or temporal.exe) binary
to your system's PATH.
Development Server Execution
Start the development server using the following command:
temporal server start-dev
This command starts the server (listening on localhost:7233 by default), creates the default
default Namespace, and starts the Web UI (accessible at
http://localhost:8233).
SDK Installation
Install the Temporal SDK for your preferred language:
Workflows orchestrate Activities and manage durable state. They execute deterministically, ensuring
resilience against failures.
Workflow Definition
Define Workflows using interfaces and implementations (Java, Go, PHP) or functions/classes (Python,
TypeScript, .NET). Workflow logic must be deterministic.
TypeScript
Define Workflow logic within an async function.
import * as wf from '@temporalio/workflow';
import * as activities from './activities'; // Assuming activities.ts defines activities
// Define Activities using an interface or proxy object.
const { composeGreeting } = wf.proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});
/** A Workflow Definition */
export async function exampleWorkflow(name: string): Promise<string> {
const greeting = await composeGreeting('Hello', name);
return greeting;
}
Python
Define Workflows within classes.
from temporalio import workflow
from datetime import timedelta
import activities # Assume activities.py defines compose_greeting
# Import activity stubs (created via @activity.defn)
# Use this approach for type safety if stubs generated
# from activities import GreetingActivities
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
# Execute an Activity
return await workflow.execute_activity(
compose_greeting,
("Hello", name),
start_to_close_timeout=timedelta(minutes=1),
)
Define a Workflow interface annotated with @WorkflowInterface and implement it.
package your.application; // Use your actual package name
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Workflow; // Import Workflow class
import java.time.Duration;
import your.application.activities.YourActivities; // Assume YourActivities interface exists in activities subpackage
@WorkflowInterface
public interface YourWorkflow {
@WorkflowMethod
String execute(String name);
}
public class YourWorkflowImpl implements YourWorkflow {
private final YourActivities activities;
public YourWorkflowImpl() {
// Configure Activity options
ActivityOptions options = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofMinutes(1))
// Optional: Configure retry options
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build();
// Create an Activity stub
this.activities = Workflow.newActivityStub(YourActivities.class, options);
}
@Override
public String execute(String name) {
// Execute an Activity by calling its method
return activities.doWork("Hello " + name);
}
}
.NET
Define Workflows in classes, marking the entry point with [WorkflowRun].
using Temporalio.Workflows;
using Temporalio.Activities; // For Activity scheduling options if needed
using System;
using System.Threading.Tasks;
using YourApplication.Activities; // Assume YourApplication.Activities.MyActivities class exists
[Workflow]
public class YourWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync(string name)
{
// Execute an Activity
return await Workflow.ExecuteActivityAsync<string>(
(MyActivities act) => act.ComposeGreetingAsync("Hello", name),
new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(1) }
);
}
}
PHP
Define a Workflow interface annotated with #[WorkflowInterface] and implement it.
<?php
namespace App\Workflow; // Use appropriate namespace
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;
use Temporal\Activity\ActivityOptions;
use Temporal\Workflow;
use App\Activity\YourActivityInterface; // Assumes YourActivityInterface exists
#[WorkflowInterface]
interface YourWorkflowInterface
{
#[WorkflowMethod]
public function execute(string $name): \\Generator;
}
class YourWorkflow implements YourWorkflowInterface
{
public function execute(string $name): \\Generator
{
Workflow::getLogger()->info('Workflow started', ['name' => $name]);
// Workflow logic...
// ... execute activities, timers, etc.
return 'Workflow completed for ' . $name; // yield return for async
}
}
?>
Workflow Registration
Workers need to know which Workflow implementations correspond to the Workflow Types requested by
Workflows. This is done by registering Workflow implementations with the Worker.
Note: This is typically done automatically by the SDK when you start a Workflow, but
it's good to understand the concept for manual registration.
# Pseudocode for Workflow registration
# --- Original Code ---
# result = await workflow.execute_activity(activity_A, ...)
# --- Updated Code ---
# We want to optionally call activity_B before activity_A
# Change ID identifies this specific patch/change
patch_change_id = "add-activity-B-call-v1"
# Default version (original behavior) is -1 (or some SDK-specific default)
# New version is 1 (or the next sequential number for this change ID)
version = workflow.get_version(patch_change_id, default=-1, max_supported=1)
if version == 1:
# New code path: Call B first
await workflow.execute_activity(activity_B, ...)
# Code common to both versions (or the original path if version == -1)
result = await workflow.execute_activity(activity_A, ...)
Workflow Constraints (Determinism)
Workflow code must be deterministic. This means that given the same input and history, the code must
always produce the same sequence of commands (like starting Activities or Timers). This allows the
Temporal Cluster to replay the Workflow history and recover its state after failures.
To maintain determinism:
No mutable global state: Avoid relying on or modifying variables outside the
Workflow's scope that could change between replays.
No direct external calls: Do not make network calls, file system access, or interact
with external systems directly from Workflow code. Use Activities for all side effects.
Deterministic randomness: If you need random numbers, use the SDK's deterministic
random functions (like Workflow.newRandom() in Java/Go, workflow.random() in Python/TS).
Deterministic time: Use the SDK's time functions (like Workflow.currentTimeMillis()
in Java, Workflow.now() in TS, workflow.now() in Python) instead of system time.
Deterministic iteration order: Avoid iterating over maps or dictionaries directly if the
order is not guaranteed. Convert them to sorted lists first if iteration order matters.
Avoid threading/goroutines (directly): Do not spawn threads or goroutines directly within
Workflow code. Use the SDK's mechanisms for concurrency if needed (like Promise.all in TS,
Asyncio primitives with workflow.asyncio in Python, Workflow.go in Go, Async.function in Java).
Breaking these rules can lead to non-deterministic errors, which typically surface during replays or
failovers and can be hard to debug. The SDK provides APIs (like Side Effects, `Workflow.getVersion`)
to handle non-deterministic logic when absolutely necessary.
Workflow Versioning (Patching)
Since Workflow code must be deterministic, deploying updated Workflow code that changes the sequence of
commands (like adding a new Activity call, removing a Timer) can break replays of existing, in-flight
Workflow Executions that were started with the old code.
The Workflow.getVersion API (or equivalent in each SDK) allows you to safely deploy changes to
Workflow definitions without breaking compatibility.
Wrap the new or modified logic within a getVersion block, assigning it a unique change ID (often an
integer or descriptive string).
Specify a default version (typically associated with the original code path) and the new version
(associated with the changed code path).
When a Worker executes this code during a replay: If the history already contains a marker for this
change ID, the SDK automatically executes the code path corresponding to the recorded version.
When a Worker executes this code for the *first time* (not a replay for this specific part), it
executes the code associated with the *new* version you specified and records that version marker in the
history.
This ensures that existing executions continue deterministically along their original path, while new
executions (or executions reaching this point for the first time) use the updated logic.
# Pseudocode for using getVersion
# --- Original Code ---
# result = await workflow.execute_activity(activity_A, ...)
# --- Updated Code ---
# We want to optionally call activity_B before activity_A
# Change ID identifies this specific patch/change
patch_change_id = "add-activity-B-call-v1"
# Default version (original behavior) is -1 (or some SDK-specific default)
# New version is 1 (or the next sequential number for this change ID)
version = workflow.get_version(patch_change_id, default=-1, max_supported=1)
if version == 1:
# New code path: Call B first
await workflow.execute_activity(activity_B, ...)
# Code common to both versions (or the original path if version == -1)
result = await workflow.execute_activity(activity_A, ...)
After all in-flight Workflows using the old code path (default version) have completed, you can remove
the getVersion call and the old code path, simplifying the Workflow definition.
Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.
Execute Child Workflows
Workflows can start other Workflows, known as Child Workflows. This allows for modular design, breaking
down complex business processes into smaller, reusable, and independently testable units.
Key characteristics of Child Workflows:
Independent History: Each Child Workflow has its own separate event history.
Parent Relationship: The parent Workflow can track the status of the child and receive
its result upon completion.
Cancellation Propagation: Cancellation can be configured to propagate from parent to
child (or not).
Retries and Timeouts: Child Workflows have their own timeouts and retry policies,
independent of the parent.
Task Queues: Child Workflows can run on the same or different Task Queues as the parent.
Starting a Child Workflow is similar to starting a Workflow via the Client, but it's done from within the
parent Workflow's code using specific SDK functions (for example, workflow.ExecuteChildWorkflow). Options
control the parent-child relationship behavior (for example, what happens if the parent completes before the
child).
TypeScript
import { executeChild, proxyActivities, ParentClosePolicy, workflowInfo } from '@temporalio/workflow'; // Import workflowInfo
import { childWorkflow } from './child'; // Assume childWorkflow definition exists
export async function parentWorkflow(name: string): Promise<string> {
// Start child and wait for result
const result = await executeChild(childWorkflow, {
args: [name],
workflowId: `child-${workflowInfo().workflowId}`, // Ensure unique child ID using workflowInfo
// Default: ParentClosePolicy.TERMINATE (terminates child if parent closes)
// Use ABANDON if child should continue after parent finishes
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
});
return `Parent received: ${result}`;
}
Python
from temporalio import workflow
from temporalio.common import ParentClosePolicy
# Assume child_workflow is defined in another file/module
with workflow.unsafe.imports_passed_through():
from child_workflows import ChildWorkflow
@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self, name: str) -> str:
result = await workflow.execute_child_workflow(
ChildWorkflow.run,
f"Input for {name}",
id=f"child-{workflow.info().workflow_id}",
parent_close_policy=ParentClosePolicy.ABANDON,
)
return f"Parent received: {result}"
Go
package app // Assuming parent workflow is in app package
import (
"fmt" // For string formatting
"go.temporal.io/sdk/workflow"
"go.temporal.io/api/enums/v1"
// Assume app defines ChildWorkflow function
"your_module/app"
)
func ParentWorkflow(ctx workflow.Context, name string) (string, error) {
cwo := workflow.ChildWorkflowOptions{
// Generate a unique ID, e.g., combining parent ID and a unique part
WorkflowID: fmt.Sprintf("child-%s-%s", workflow.GetInfo(ctx).WorkflowExecution.ID, workflow.GetRandomUUID(ctx)),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, cwo)
var result string
// Assuming ChildWorkflow is defined in the same package or imported correctly
future := workflow.ExecuteChildWorkflow(ctx, app.ChildWorkflow, name)
if err := future.Get(ctx, &result); err != nil {
// Handle child workflow error (e.g., log, return error)
workflow.GetLogger(ctx).Error("Child workflow failed", "Error", err)
return "", err
}
return "Parent received: " + result, nil
}
// Assume ChildWorkflow definition exists, e.g.:
// func ChildWorkflow(ctx workflow.Context, inputName string) (string, error) {
// workflow.GetLogger(ctx).Info("Child workflow started", "Input", inputName)
// return "Result from child for " + inputName, nil
// }
Java
package your.application.workflows; // Use your actual package name
import io.temporal.workflow.*;
import io.temporal.api.enums.v1.ParentClosePolicy;
import java.util.UUID; // For UUIDs
// Assume ChildWorkflow interface exists in the same package or imported
public class ParentWorkflowImpl implements ParentWorkflow { // Assuming ParentWorkflow interface
@Override
public String execute(String name) {
ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
.setWorkflowId("child-" + Workflow.getInfo().getWorkflowId() + "-" + UUID.randomUUID().toString()) // Ensure unique ID
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.build();
ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);
// Start child and wait for result
String result = child.execute("Input for " + name);
return "Parent received: " + result;
// To start asynchronously:
// Promise promise = Async.function(child::execute, "Input for " + name);
// Use promise.get() later
}
}
.NET
using Temporalio.Workflows;
using Temporalio.Common; // For ParentClosePolicy
using System; // For Guid
using System.Threading.Tasks;
using YourApplication.Workflows; // Assuming ChildWorkflow class exists in the same namespace or imported
[Workflow]
public class ParentWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync(string name)
{
string result = await Workflow.ExecuteChildWorkflowAsync(
(ChildWorkflow wf) => wf.RunChildAsync($"Input for {name}"), // Ensure RunChildAsync exists on ChildWorkflow
new ChildWorkflowOptions
{
WorkflowId = $"child-{Workflow.Info.WorkflowId}-{Guid.NewGuid()}", // Ensure unique ID
ParentClosePolicy = ParentClosePolicy.Abandon,
}
);
return $"Parent received: {result}";
}
}
PHP
<?php declare(strict_types=1);\\\n\\\nnamespace App\\\\\\\\Workflow;\\\\n\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowInterface;\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityOptions;\\\\nuse Temporal\\\\\\\\Common\\\\\\\\RetryOptions;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityInterface;\\\\nuse App\\\\\\\\ChildWorkflows\\\\\\\\ChildWorkflow;\\\\nuse Ramsey\\\\Uuid\\\\Uuid; // Example: Use ramsey/uuid\\\\n\\\\n// Define ParentWorkflow interface and implementation\\\\ninterface ParentWorkflowInterface extends ParentWorkflowInterface<?> {\\\n @ParentWorkflowMethod(name = "ParentWorkflow")\\\n void execute(String name);\n}\\\n\\\\nclass ParentWorkflowImpl implements ParentWorkflowInterface {\\\n @Override\\\n @ParentWorkflowMethod(name = "ParentWorkflow")\\\n public void execute(String name) {\\\n // Start child workflow\\\n ChildWorkflow.start(\\\"child-workflow\\\", [\\\"name\\\" => $name]);\\\\\n // You can also use a different workflow ID if needed\\\\\n }\\\\\n}\\\n?>
Continue-As-New
Workflow Executions have limits on their event history size and duration. For long-running processes
or Workflows that accumulate a large history (for example, loops processing many items), `Continue-As-New` allows
a Workflow to effectively restart itself with a fresh history, carrying over necessary state.
When a Workflow decides to `Continue-As-New`:
It specifies the same Workflow Type (or potentially a different one).
It provides the input arguments for the *new* execution. This is how state is carried over.
The current execution is marked as `ContinuedAsNew`.
A new Workflow Execution starts immediately with the provided arguments and a clean history, but
logically continues the business process. The Run ID changes, but the Workflow ID can be kept the same.
This is the standard pattern for implementing infinite loops or processes that need to run indefinitely
without hitting history limits. It's also useful for periodic cleanup of large state within a Workflow.
# Pseudocode for Continue-As-New loop
@workflow.defn
class PerpetualWorkflow:
@workflow.run
async def run(self, counter: int, other_state: dict = None): # Example with state
if other_state is None:
other_state = {} # Initialize state if first run
workflow.logger.info(f"Executing run {counter} with state: {other_state}")
# Example condition to ContinueAsNew (e.g., every 1000 iterations)
if counter > 0 and counter % 1000 == 0:
workflow.logger.info(f"Processed {counter} items, continuing as new...")
# Carry over the necessary state to the new execution
# Only include state needed for the *next* run
workflow.continue_as_new(counter, other_state) # Restarts run() with new args
# Note: Code after continue_as_new does not execute in the current run
# --- Regular Workflow Logic ---
workflow.logger.info(f"Processing item {counter}")
# Do work... update other_state if needed
# await workflow.execute_activity(process_item, counter, other_state, ...)
other_state['last_processed'] = counter
await workflow.sleep(timedelta(seconds=1)) # Simulate work or delay
# "Loop" by calling continue_as_new with updated state for the next iteration
workflow.continue_as_new(counter + 1, other_state)
# Client side starting the workflow for the first time:
async def run(self, counter: int):
if counter >= 1000: # Example condition to ContinueAsNew
print(f"Processed {counter} items, continuing as new...")
# Carry over the counter state to the new execution
workflow.continue_as_new(counter) # Restarts run() with counter
else:
print(f"Processing item {counter}")
# Do work...
await workflow.execute_activity(process_item, counter, ...)
# Schedule next iteration after a delay
await workflow.sleep(timedelta(seconds=10))
# "Loop" by calling continue_as_new with updated state
workflow.continue_as_new(counter + 1)
Side Effects
While Workflows must be deterministic, sometimes you need to execute a short, non-deterministic piece of
code within the Workflow logic itself (without calling an Activity). A common example is generating a UUID
or getting a random number using a non-SDK function (which would violate determinism if called directly).
Workflow.SideEffect (or equivalent) allows this.
You provide a function/lambda containing the non-deterministic code to SideEffect.
The first time this code path is executed, the function is run, and its return value is recorded in the
Workflow history.
During replays, the function is *not* re-executed. Instead, the recorded value from the history is
returned directly, preserving determinism.
Side Effects should only be used for short, simple operations where creating a dedicated Activity would be
overkill. They are not suitable for I/O or long-running computations. For example, generating a unique ID or
calling a potentially flaky function that you don't want to retry as a full Activity.
# Pseudocode for Side Effect
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self):
# Generate a random ID using a potentially non-deterministic library function
# This lambda is only executed once per execution path.
random_id = await workflow.side_effect(lambda: generate_random_id_maybe_impure())
# Use the random_id, which is now deterministic for replays
await workflow.execute_activity(use_id_activity, random_id, ...)
return f"Processed with ID: {random_id}"
# Note: For truly random values needed within workflow logic,
# prefer the SDK's deterministic random functions like workflow.random().
# SideEffect is more for integrating short non-deterministic external code.
Local Activities
Local Activities are an optimization for short-lived Activities that don't require the full overhead
of Task Queue polling, scheduling, and retries managed by the Temporal Cluster. They execute directly
within the Worker's Workflow Task processing thread.
Characteristics:
Execution: Run in the same process as the Workflow code that called them.
Latency: Much lower invocation latency compared to standard Activities (sub-millisecond).
Retries: Limited, server-side retries are not applicable. Retries must be handled by the
Workflow code itself or rely on the Workflow Task retry mechanism. A short, configurable local retry policy might be available in the SDK.
Timeouts: Subject to shorter, locally configured timeouts, often tied to the Workflow
Task timeout.
Use Cases: Ideal for simple functions like data validation, transformation, short calculations,
or accessing read-only resources local to the Worker, where the overhead of a standard Activity is
undesirable. For example, validating input against a schema loaded in Worker memory.
Determinism: Like standard Activities, Local Activities themselves do not need to be
deterministic, but their *invocation* from the Workflow must be.
Executing a Local Activity uses a specific SDK function (for example, workflow.ExecuteLocalActivity), often
requiring different, shorter timeout configurations than standard Activities.
Mutations
The Workflow.upsertMemo, Workflow.upsertSearchAttributes APIs allow Workflows to update their own Memo and
Search Attributes during execution. These updates are recorded in the event history.
Workflow.deprecatePatch is used in conjunction with Workflow.getVersion for managing code cleanup. Once you are
sure no running Workflows are using an old code path identified by a specific change ID (patch ID), you
can call `deprecatePatch(patch_change_id)`. This signals to the system (and potentially linting tools)
that this patch ID is obsolete and its corresponding `getVersion` block can eventually be removed from the
code.
Activity Development
Activities encapsulate the actual business logic, performing tasks like calling external services,
interacting with databases, or processing data. They are executed by Workers and can be retried
automatically.
Activity Definition
Define Activities using interfaces (Java, Go, PHP), functions (Python, TypeScript), or methods within
classes (.NET).
TypeScript
Define Activities as plain async functions. You can optionally group them in an object or class for
organization. Activity arguments and return types must be serializable by the configured Data
Converter (JSON by default). Use Context.current().info to access Activity information like Task Token.
import { Context } from '@temporalio/activity';
import { log } from './logger'; // Assuming a configured logger
export async function composeGreeting(greeting: string, name: string): Promise<string> {
log.info('Activity input', { greeting, name });
const { activityId, workflowExecution } = Context.current().info;
log.info('Running in activity', { activityId, workflowId: workflowExecution.workflowId });
// Simulate external call or work
await new Promise((resolve) => setTimeout(resolve, 500));
return \`\${greeting}, \${name}!\`;
}
// Optional: Group activities
export const activities = { composeGreeting };
Python
Define Activities using the @activity.defn decorator on functions. Activity arguments and return
types must be serializable (Defaults to JSON). Use activity.info() to access Activity
information.
from temporalio import activity
import time
@activity.defn
def compose_greeting(greeting: str, name: str) -> str:
activity.logger.info(f"Composing greeting for {name}")
# Activity logic...
time.sleep(0.5) # Simulate I/O
return f"{greeting}, {name}!"
@activity.defn(name="DifferentActivityName")
async def do_async_work(value: int) -> int:
activity.logger.info(f"Doing async work with {value}")
# Async activity logic...
await asyncio.sleep(1)
return value * 10
Go
Define Activities as functions or methods on a struct. The function signature must include
context.Context as the first argument. Arguments and return values must be serializable (JSON by
default). Use activity.GetInfo(ctx) to access information.
package app
import (
"context"
"fmt"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
)
// Simple function as Activity
func ComposeGreeting(ctx context.Context, greeting string, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity invoked", "greeting", greeting, "name", name)
info := activity.GetInfo(ctx)
logger.Info("Running activity", "ActivityID", info.ActivityID, "WorkflowID", info.WorkflowExecution.ID)
// Simulate work
time.Sleep(500 * time.Millisecond)
result := fmt.Sprintf("%s, %s!", greeting, name)
return result, nil
}
// --- Alternatively: Activities as struct methods ---
type MyActivities struct {
// Dependencies can be injected here (e.g., DB connection pool)
}
func (a *MyActivities) SendNotification(ctx context.Context, userID string, message string) error {
logger := activity.GetLogger(ctx)
logger.Info("Sending notification", "UserID", userID, "Message", message)
// ... logic to send notification ...
return nil
}
Java
Define an Activity interface annotated with @ActivityInterface. Each method in the
interface defines an Activity Type. Implement this interface.
package your.application.activities; // Use your actual package name
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
@ActivityInterface
public interface YourActivities {
// Default Activity name is "DoWork"
String doWork(String input);
// Explicitly named Activity
@ActivityMethod(name = "ProcessPayment")
boolean processPaymentActivity(String orderId, double amount);
}
// Implementation
public class YourActivitiesImpl implements YourActivities {
@Override
public String doWork(String input) {
// Activity logic...
return "Processed: " + input;
}
@Override
public boolean processPaymentActivity(String orderId, double amount) {
// Payment processing logic...
System.out.printf("Processing payment for order %s, amount %.2f%n", orderId, amount);
return true;
}
}
.NET
Define Activity methods within any class, marking them with the [Activity] attribute.
Methods can be static or instance methods.
using Temporalio.Activities;
using System.Threading.Tasks;
public class MyActivities
{
[Activity]
public async Task<string> ComposeGreetingAsync(string greeting, string name)
{
// Activity logic...
Activity.Logger.LogInformation("Running activity with greeting {Greeting} and name {Name}", greeting, name);
await Task.Delay(500); // Simulate work
return $"{greeting}, {name}!";
}
[Activity("StaticActivity")] // Custom name
public static string DoStaticWork(int value)
{
return $"Static result: {value * 2}";
}
}
PHP
Define an Activity interface annotated with #[ActivityInterface]. Each method defines an
Activity Type. Implement this interface.
<?php
namespace App\Activity;
use Temporal\Activity\ActivityInterface;
use Temporal\Activity\ActivityMethod;
#[ActivityInterface(prefix: "App.Activities.")] // Optional prefix for all activity names
interface YourActivityInterface
{
public function composeGreeting(string $greeting, string $name): string;
#[ActivityMethod("ExecutePayment")] // Custom name overrides prefix
public function executePaymentActivity(string $transactionId): bool;
}
// Implementation
use Temporal\Activity;
class YourActivities implements YourActivityInterface
{
public function composeGreeting(string $greeting, string $name): string
{
Activity::getLogger()->info("Composing greeting", ['greeting' => $greeting, 'name' => $name]);
// Activity logic...
return $greeting . ', ' . $name . '!';
}
public function executePaymentActivity(string $transactionId): bool
{
Activity::getLogger()->info("Executing payment", ['transactionId' => $transactionId]);
// Payment logic...
return true;
}
}
?>
Activity Context and Information
Activities can access metadata about their current execution, such as the Workflow ID, Activity ID, Task
Token (needed for asynchronous completion), and configured timeouts. This is typically done via a
context object or static methods provided by the SDK. Logging within Activities should ideally use the
SDK's logger, which automatically includes this contextual information.
Example: Accessing Activity Info (Conceptual - specific methods vary by SDK as shown in definition examples)
Workers need to know which Activity implementations correspond to the Activity Types requested by
Workflows. This is done by registering Activity implementations with the Worker.
TypeScript
Pass Activity functions (often grouped in an object) to Worker.create.
import { Worker } from '@temporalio/worker';
import * as activities from './activities'; // Assuming activities = { composeGreeting, ... }
async function run() {
const worker = await Worker.create({
// ... connection, taskQueue, workflowsPath ...
activities, // Register all exported functions from activities module
});
await worker.run();
}
Python
Provide Activity functions decorated with @activity.defn to the Worker constructor.
from temporalio.worker import Worker
from temporalio.client import Client
# Assume activities.py defines compose_greeting, etc.
from activities import compose_greeting
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[GreetingWorkflow], # Assume GreetingWorkflow is defined
activities=[compose_greeting], # Register activity functions
)
await worker.run()
Go
Use worker.RegisterActivity or worker.RegisterActivityWithOptions.
import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
// Assume app package defines ComposeGreeting and YourWorkflow
"your_module/app"
)
func main() {
c, _ := client.Dial(client.Options{})
defer c.Close()
w := worker.New(c, "my-task-queue", worker.Options{})
// Register Activity function
w.RegisterActivity(app.ComposeGreeting)
// Or register methods from a struct instance
// activityInstance := &app.MyActivities{}
// w.RegisterActivity(activityInstance)
w.RegisterWorkflow(app.YourWorkflow)
err := w.Run(worker.InterruptCh())
// Handle error
}
Java
Register Activity implementation instances using worker.registerActivitiesImplementations.
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.Worker;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
// Assume YourActivitiesImpl and YourWorkflowImpl exist
public class WorkerMain {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker("my-task-queue");
// Register Workflow implementation type
worker.registerWorkflowImplementationTypes(YourWorkflowImpl.class);
// Register Activity implementation instance(s)
worker.registerActivitiesImplementations(new YourActivitiesImpl());
factory.start();
}
}
.NET
Add Activity classes to the Worker options via AddActivity.
using Temporalio.Client;
using Temporalio.Worker;
// Assume MyActivities and YourWorkflow exist
var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
var workerOptions = new TemporalWorkerOptions("my-task-queue")
.AddWorkflow<YourWorkflow>()
.AddActivity(MyActivities.ComposeGreetingAsync) // Register specific static method
.AddActivity(MyActivities.DoStaticWork)
.AddActivities(new MyActivities()); // Register instance methods if needed
var worker = new TemporalWorker(client, workerOptions);
try {
await worker.ExecuteAsync(CancellationToken.None); // Use cancellation token
} catch (Exception ex) {
// Handle exception
}
PHP
Register Activity implementation instances via registerActivityImplementations.
<?php
use Temporal\\\\WorkerFactory;\nuse App\\\\YourWorkflow; // Assuming Workflow implementation\nuse App\\\\YourActivity; // Assuming Activity implementation\nuse Psr\\\\Log\\\\AbstractLogger; // For example logger\n\n// Simple logger implementation for the example\nclass SimpleLogger extends AbstractLogger {\n public function log($level, $message, array $context = []): void { // Add void return type\n echo \"LOG: $message\" . PHP_EOL; // Use PHP_EOL for line endings\n }\n}\n\n$factory = WorkerFactory::create();\n$worker = $factory->newWorker(\'my-task-queue\');\n\n// Register Workflow implementation class\n$worker->registerWorkflowTypes(YourWorkflow::class);\n\n// Register Activity implementation instance(s)\n// Dependencies (like logger) can be injected here\n$worker->registerActivityImplementations(new YourActivity(new SimpleLogger()));\n\n$factory->run();
?>
Activity Heartbeating
For long-running Activities, Heartbeating allows the Activity to report its progress and liveness back to
the Temporal Cluster. This serves several purposes:
Liveness Detection: If the Cluster doesn't receive a heartbeat within the configured
HeartbeatTimeout, it assumes the Activity Worker has crashed and reschedules the Activity (respecting
the Retry Policy).
Progress Checkpointing: Activities can include details (payload) with the heartbeat,
effectively checkpointing their progress. If the Activity is retried (on the same or different
Worker), it can retrieve the last checkpointed details from the Activity context and resume from that
point, avoiding redundant work.
Cancellation Check: The heartbeat mechanism also allows the Activity to check if a
cancellation has been requested for the Workflow or the Activity itself. The Activity can then perform
cleanup and exit gracefully.
Heartbeating is crucial for Activities that might run longer than the StartToCloseTimeout or for tasks where
reporting progress is beneficial. It's implemented by calling a specific SDK function (for example, activity.RecordHeartbeat(ctx, details...)) periodically within the Activity code.
# Pseudocode for Activity with Heartbeating
@activity.defn
async def long_running_task(input_data):
progress = load_progress_from_details(activity.info().heartbeat_details)
if progress is None:
progress = 0
total_items = len(input_data)
for i in range(progress, total_items):
# Check for cancellation before/during work
activity.heartbeat(i) # Report progress (item 'i' is about to be processed)
# Process item i...
await asyncio.sleep(1) # Simulate work
# Optional: Heartbeat again after processing item, if fine-grained needed
# activity.heartbeat(i + 1)
return "All items processed"
def load_progress_from_details(details):
# Logic to interpret details sent with previous heartbeat
if details: return details # Assuming details itself is the progress marker
return None
Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.
Complete Activities Asynchronously
In some scenarios, the process that completes an Activity might be different from the one that started it.
For example, an Activity might trigger an external system (via API call) and rely on a webhook callback to
signal completion.
To handle this:
The Activity obtains its unique TaskToken from the Activity Context. The Task Token identifies this
specific Activity execution attempt.
The Activity returns control to the Worker *without* completing, typically by throwing/returning a
special indicator (like activity.DoNotCompleteOnReturn or a specific exception type).
The Activity passes the TaskToken to the external system or process that will eventually signal
completion.
When the external process finishes, it uses a Temporal Client method (like CompleteActivityById or
CompleteActivity, providing the Task Token) to report the result or failure back to the Temporal
Cluster.
The Worker that originally executed the Activity does not need to be running for the completion call to
succeed. The Temporal Cluster uses the Task Token to correlate the completion call with the correct
Workflow Execution.
# Pseudocode for Activity triggering async completion
@activity.defn
async def trigger_external_job(job_params):
info = activity.info()
task_token_str = info.task_token.decode('utf-8') # Get task token as string
# Call external system, passing task_token_str as a callback identifier
external_api.start_job(job_params, callback_token=task_token_str)
# Tell the Worker not to complete this Activity yet
activity.do_not_complete_on_return()
# --- Elsewhere, in a separate process (e.g., webhook handler) ---
def handle_external_job_completion(callback_token, result, error):
client = connect_to_temporal() # Connect with a Temporal client
task_token_bytes = callback_token.encode('utf-8')
try:
if error:
client.report_activity_failure(task_token_bytes, error)
else:
client.report_activity_completion(task_token_bytes, result)
except Exception as e:
# Handle potential errors reporting completion (e.g., activity timed out already)
log.error(f"Failed to report activity completion/failure for token {callback_token}: {e}")
Activity Cancellation
Activities can be cancelled in two main ways:
Workflow Cancellation: If the Workflow Execution that invoked the Activity is cancelled,
the Temporal Cluster propagates the cancellation request to running Activities (depending on the
Workflow's cancellation handling logic and SDK Cancellation Scopes).
Activity Timeouts: If an Activity exceeds its StartToCloseTimeout or ScheduleToCloseTimeout,
it is effectively cancelled by the timeout mechanism, and a failure is reported to the Workflow (which
may trigger retries).
Activities that need to perform cleanup when cancelled (for example, releasing a lock, rolling back a transaction)
should check for cancellation requests. This is often done:
Implicitly during activity.RecordHeartbeat calls.
Explicitly using SDK context cancellation checks (for example, ctx.Err() in Go, checking Activity.Current.CancellationToken.IsCancellationRequested in .NET, checking activity.is_cancelled() in Python).
Upon detecting cancellation, the Activity should perform its cleanup and then typically throw a specific
cancelled exception type provided by the SDK.
Temporal Client
The Temporal Client is the interface used by your application code (that isn't part of a Workflow or
Activity) to interact with the Temporal Cluster. It allows you to start, signal, query, and manage
Workflow Executions.
Client Connection
Establish a connection to the Temporal Cluster, specifying the target address (host:port) of the Frontend
Service. Options usually include configuring TLS, identity, namespace, and connection timeouts. It's
recommended to create a single Client instance per application process and reuse it.
# Pseudocode for client connection
options = {
target_host: "your-temporal-frontend.example.com:7233",
namespace: "your-namespace", # Defaults typically to 'default'
# tls_options: { ... } # Optional TLS config
}
client = TemporalClient.connect(options)
# Use the client instance throughout your application...
# Close connection when application exits
client.close()
Start a Workflow Execution
Use the Client to start a new Workflow Execution. Key parameters include:
Workflow Type: The name or type identifying the Workflow definition registered with
Workers.
Task Queue: The Task Queue name that Workers processing this Workflow are listening on.
Workflow ID: A unique business-level identifier for this specific execution (for example, order ID,
user ID). Reusing a Workflow ID with a different run attempt results in a "Workflow execution already
started" error unless specific policies are set.
Arguments: The input arguments required by the Workflow function/method.
Options: Workflow Execution Timeout, Workflow Run Timeout, Retry Policies, Memo, Search Attributes, etc.
TypeScript
import { Client } from '@temporalio/client';
import { exampleWorkflow } from './workflows'; // Assuming workflow function
async function runClient() { // Renamed to avoid conflict if run() is used elsewhere
const client = new Client(); // Connects to localhost:7233 by default
const handle = await client.workflow.start(exampleWorkflow, {
taskQueue: 'my-task-queue',
args: ['Temporal'], // Arguments for exampleWorkflow
workflowId: 'workflow-' + Date.now(), // Unique ID
// Optional: workflowExecutionTimeout: '5 minutes',
});
console.log(`Started workflow ${handle.workflowId}`);
// const result = await handle.result(); // Wait for completion
}
Python
import asyncio
import uuid # For generating unique IDs
from temporalio.client import Client
from your_workflows import GreetingWorkflow # Assuming @workflow.defn class
async def start_workflow_client(): # Renamed for clarity
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
GreetingWorkflow.run, # Reference the @workflow.run method
"Temporal", # First arg for run method
id="workflow-" + str(uuid.uuid4()), # Use UUID for uniqueness
task_queue="my-task-queue",
# Optional: execution_timeout=timedelta(minutes=5)
)
print(f"Started workflow {handle.id}")
# result = await handle.result()
Go
package main // Assuming client code is in main package
import (
"context"
"log"
"time" // For Time
"github.com/pborman/uuid" // For UUIDs
"go.temporal.io/sdk/client"
"your_module/app" // Assuming app.YourWorkflow exists
)
func main() {
c, err := client.Dial(client.Options{})
if err != nil { log.Fatalln("Unable to create client", err) }
defer c.Close()
options := client.StartWorkflowOptions{
ID: "workflow-" + uuid.New(), // Use UUID for uniqueness
TaskQueue: "my-task-queue",
// WorkflowExecutionTimeout: 5 * time.Minute,
}
we, err := c.ExecuteWorkflow(context.Background(), options, app.YourWorkflow, "Temporal")
if err != nil { log.Fatalln("Unable to execute workflow", err) }
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
// var result string
// err = we.Get(context.Background(), &result) // Wait for completion
}
Java
package your.application; // Use your actual package name
import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.UUID; // For UUIDs
import your.application.workflows.YourWorkflow; // Assuming YourWorkflow interface exists
public class Starter {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("my-task-queue")
.setWorkflowId("workflow-" + UUID.randomUUID().toString()) // Use UUID
// .setWorkflowExecutionTimeout(Duration.ofMinutes(5))
.build();
// Create a Workflow stub targeting the interface
YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, options);
// Start execution asynchronously
WorkflowExecution execution = WorkflowClient.start(workflow::execute, "Temporal");
// The above is equivalent to:
// CompletableFuture result = WorkflowClient.execute(workflow::execute, "Temporal");
System.out.println("Started workflow " + execution.getWorkflowId());
// To wait for result (if using start):
// String result = workflow.execute("Temporal"); // This blocks until completion
// Or if using execute, use the CompletableFuture
}
}
.NET
using Temporalio.Client;
using System;
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourApplication.Workflows.YourWorkflow exists
public class WorkflowStarter
{
public static async Task StartWorkflowAsync() // Example method
{
var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
// Start using the Workflow class and run method signature
WorkflowHandle<YourWorkflow, string> handle = await client.StartWorkflowAsync(
(YourWorkflow wf) => wf.RunAsync("Temporal"), // Lambda targeting the run method
new(id: "workflow-" + Guid.NewGuid(), taskQueue: "my-task-queue") // Use Guid
{
// ExecutionTimeout = TimeSpan.FromMinutes(5)
});
Console.WriteLine($"Started workflow {handle.Id}");
// string result = await handle.GetResultAsync(); // Wait for completion
}
}
PHP
<?php declare(strict_types=1);\n\nnamespace App\\Client; // Use appropriate namespace\n\nuse Temporal\\Client\\WorkflowClientInterface;\nuse Temporal\\Client\\WorkflowOptions;\nuse App\\Workflow\\YourWorkflowInterface; // Assuming Workflow interface\nuse Ramsey\\Uuid\\Uuid; // Example: Use ramsey/uuid for UUIDs\n\n// Assume $workflowClient is initialized (e.g., via dependency injection)\n/** @var WorkflowClientInterface $workflowClient */\n\n$workflow = $workflowClient->newWorkflowStub(\n YourWorkflowInterface::class,\n WorkflowOptions::new()\n ->withTaskQueue(\'my-task-queue\')\n ->withWorkflowId(\'workflow-\' + Uuid::uuid4()->toString()) // Generate UUID\n // ->withWorkflowExecutionTimeout(CarbonInterval::minutes(5))\n);\n\n$run = $workflowClient->start($workflow, \'Temporal\'); // Pass args for execute method\n\necho \"Started workflow \" . $run->getExecution()->id . \"\\n\";\n\n// $result = $run->getResult(); // Wait for completion\n?>
Get the Result of a Workflow Execution
After starting a Workflow, you can obtain a handle or stub representing that specific execution. You can
use this handle to wait for the Workflow to complete and retrieve its return value or exception. This is
typically a blocking call.
# Pseudocode for getting result
client = ...
workflow_handle = client.start_workflow(...)
print(f"Waiting for workflow {workflow_handle.id} to complete...")
try:
# This blocks until the workflow finishes (completes, fails, times out, etc.)
result = workflow_handle.get_result(timeout=300) # Optional timeout for the wait itself
print(f"Workflow completed successfully with result: {result}")
except WorkflowFailedError as e:
print(f"Workflow failed: {e.cause}")
except WorkflowTimedOutError:
print("Workflow timed out")
# ... other potential exceptions like cancellation
Signal Workflows
Signals provide a way to send data asynchronously into a running Workflow Execution. Workflows define
Signal handlers to process these incoming signals. Signals are durable and guaranteed to be delivered
(at least once). They are often used for updates, notifications, or external events.
Use the Client to send a signal:
Identify the target Workflow Execution (by Workflow ID).
Specify the Signal name (defined in the Workflow).
Provide the Signal arguments.
You can signal a Workflow even if you don't know if it's running yet using `signalWithStart`, which will
start the Workflow if it doesn't exist and then immediately deliver the signal.
TypeScript
import { Client } from '@temporalio/client';
import { exampleWorkflow } from './workflows'; // Assuming workflow function with signals
// Assume workflow defines a signal handler named 'updateGreeting'
async function signalClient() { // Renamed for clarity
const client = new Client();
const handle = client.workflow.getHandle('workflow-id-123'); // Get handle by ID
// Send signal
await handle.signal('updateGreeting', 'Hi');
console.log('Signal sent');
// Signal with start: Starts if not running, otherwise signals.
// const handle = await client.workflow.signalWithStart(exampleWorkflow, {
// taskQueue: 'my-task-queue',
// workflowId: 'unique-workflow-id',
// args: ['Initial Name'],
// signal: 'updateGreeting',
// signalArgs: ['Hi'],
// });
}
package main
import (
"context"
"log"
"go.temporal.io/sdk/client"
// Assume your workflow defines a signal named "UpdateGreeting"
// import "your_module/app"
)
func main() {
c, _ := client.Dial(client.Options{})
defer c.Close()
workflowID := "workflow-id-123"
signalName := "UpdateGreeting" // Matches name in Workflow definition
signalArg := "Hi"
// Send signal
err := c.SignalWorkflow(context.Background(), workflowID, "", signalName, signalArg)
if err != nil { log.Println("Error signaling workflow", err) } else { log.Println("Signal sent") }
// Signal with start
// workflowRun, err := c.SignalWithStartWorkflow(context.Background(),
// "unique-workflow-id", "my-task-queue",
// signalName, signalArg, // Signal and args
// client.StartWorkflowOptions{...}, // Workflow options
// app.YourWorkflow, "Initial Name", // Workflow function and args
// )
}
Java
package your.application;
import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import your.application.workflows.YourWorkflow; // Assume YourWorkflow interface defines a @SignalMethod void updateGreeting(String greeting);
public class Signaler {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); // Example connection
WorkflowClient client = WorkflowClient.newInstance(service);
String workflowId = "workflow-id-123";
// Create a stub for signaling, only needs the Workflow ID
YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, workflowId);
// Send signal (non-blocking)
workflow.updateGreeting("Hi");
System.out.println("Signal sent");
// Signal with start: Use WorkflowStub options
// WorkflowOptions startOptions = WorkflowOptions.newBuilder()
// .setTaskQueue("my-task-queue")
// .setWorkflowId("unique-workflow-id")
// .build();
// YourWorkflow signalWithStartStub = client.newWorkflowStub(YourWorkflow.class, startOptions);
// BatchRequest request = client.newSignalWithStartRequest();
// request.add(signalWithStartStub::execute, "Initial Name"); // Add start
// request.add(signalWithStartStub::updateGreeting, "Hi"); // Add signal
// client.signalWithStart(request);
}
}
.NET
using Temporalio.Client;
using System; // For Console
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourWorkflow defines [WorkflowSignal] void UpdateGreeting(string greeting);
public class WorkflowSignaler
{
public static async Task SignalWorkflowAsync() // Example method
{
var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
var handle = client.GetWorkflowHandle<YourWorkflow>("workflow-id-123"); // Specify workflow type if using lambda
// Send signal via lambda targeting the signal method
await handle.SignalAsync((YourWorkflow wf) => wf.UpdateGreeting("Hi"));
Console.WriteLine("Signal sent");
// ... existing code ...
}
}
PHP
<?php declare(strict_types=1);\\\\n\\\\nnamespace App\\\\\\\\Client;\\\\n\\\\nuse Temporal\\\\\\\\Client\\\\\\\\WorkflowClientInterface;\\\\nuse App\\\\\\\\Workflow\\\\\\\\YourWorkflowInterface; // Assume YourWorkflowInterface defines a signal method updateGreeting(string $greeting)\\\\n\\\\n/** @var WorkflowClientInterface $workflowClient */\\\\n\\\\n// Use newWorkflowStub to get a stub targeting a specific Workflow ID\\\\n$workflowStub = $workflowClient->newWorkflowStub(YourWorkflowInterface::class, \\\\\"workflow-id-123\\\\\");\\\\n// Send the signal (this example requires the workflow to be running)
$workflowStub->updateGreeting(\\\"Hi From PHP Client!\\\");
echo \\\\\"Signal sent to workflow \\\\\" . $workflowStub->getWorkflowId() . \\\\\"\\\\n\\\\\";
?>
Query Workflows
Queries provide a synchronous way to fetch state from a running Workflow Execution without affecting its
history. Workflows define Query handlers to respond to these requests. Queries are strongly consistent by
default, meaning they reflect the state of the Workflow just before the query is processed.
Use the Client to query a Workflow:
Identify the target Workflow Execution (by Workflow ID).
Specify the Query name (defined in the Workflow).
Provide any Query arguments.
Receive the Query result directly.
TypeScript
import { Client } from '@temporalio/client';
import { exampleWorkflow } from './workflows'; // Assuming workflow function with signals
// Assume workflow defines a query handler named 'getGreeting'
async function queryClient() { // Renamed for clarity
const client = new Client();
const handle = client.workflow.getHandle('workflow-id-123');
// Send query
const result = await handle.query('getGreeting');
console.log(`Query result: ${result}`);
}
using Temporalio.Client;
using System; // For Console
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourWorkflow defines [WorkflowQuery] string GetGreeting();
public class WorkflowQuerier
{
public static async Task QueryWorkflowAsync() // Example method
{
var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
var handle = client.GetWorkflowHandle<YourWorkflow>("workflow-id-123"); // Specify workflow type for lambda
// Send query via lambda targeting the query method
string result = await handle.QueryAsync((YourWorkflow wf) => wf.GetGreeting());
Console.WriteLine($"Query result: {result}");
// ... existing code ...
}
}
Terminating a Workflow Execution stops it immediately and forcefully. It will not execute any further
logic or cleanup activities. Use termination with caution, typically for administrative purposes or when
a Workflow is known to be in an unrecoverable state. Signals or Cancellations are generally preferred
for graceful shutdown.
# Pseudocode for termination
client = ...
workflow_handle = client.get_workflow_handle("workflow-id-to-terminate")
try:
await workflow_handle.terminate(reason="Admin cleanup")
print("Workflow terminated.")
except Exception as e:
print(f"Failed to terminate workflow: {e}") # e.g., workflow already completed
Cancel Workflows
Cancellation is a cooperative mechanism. Requesting cancellation sends a cancellation request to the
Workflow Execution. The Workflow code must explicitly check for cancellation requests (usually via SDK
mechanisms like Cancellation Scopes or checking context) and decide how to handle them (for example, perform
cleanup Activities, then exit gracefully). If the Workflow ignores the request, it continues running.
# Pseudocode for cancellation request
client = ...
workflow_handle = client.get_workflow_handle("workflow-id-to-cancel")
try:
await workflow_handle.cancel()
print("Cancellation requested.")
# Note: Workflow might take time to actually handle the cancellation
except Exception as e:
print(f"Failed to request cancellation: {e}")
Update Workflows
Updates provide a way to synchronously mutate the state of a running Workflow Execution and receive a result.
Unlike Signals (which are fire-and-forget), the Client sending an Update waits for confirmation that the
Update has been received and validated by the Workflow, and can optionally wait for the Update handler
logic within the Workflow to complete and return a value. Updates require a running Worker to process.
Workflows define Update handlers (and optional validators) to process these requests. Updates are useful for operations that need
to modify Workflow state and confirm the result of that modification, such as incrementing a counter or
changing a status, while ensuring the operation doesn't proceed if validation fails.
Sending an Update
Use the Client to send an Update:
Identify the target Workflow Execution (by Workflow ID).
Specify the Update name (defined in the Workflow).
Provide the Update arguments.
Wait for the Update to be accepted/rejected, and optionally wait for the result.
TypeScript
import { Client } from '@temporalio/client';
import { setLanguage } from './workflows'; // Assuming Update definition
async function updateClient() {
const client = new Client();
const handle = client.workflow.getHandle('workflow-id-123');
// Execute update and wait for completion
const previousLanguage = await handle.executeUpdate(setLanguage, {
args: [Language.SPANISH],
});
console.log(`Update completed, previous language: ${previousLanguage}`);
// Or, start update and get handle to wait for acceptance
// const updateHandle = await handle.startUpdate(setLanguage, {
// args: [Language.FRENCH],
// waitForStage: WorkflowUpdateStage.ACCEPTED,
// });
// console.log(`Update ${updateHandle.updateId} accepted.`);
// const result = await updateHandle.result(); // Wait for completion later
}
Python
import asyncio
from temporalio.client import Client
# Assume workflow defines @workflow.update method 'set_language'
async def update_workflow_client():
client = await Client.connect("localhost:7233")
handle = client.get_workflow_handle("workflow-id-123")
# Execute update and wait for completion
previous_language = await handle.execute_update("set_language", "spanish")
print(f"Update completed, previous language: {previous_language}")
# Or, start update and get handle to wait for acceptance
# update_handle = await handle.start_update("set_language", "french", wait_for_acceptance=True)
# print(f"Update {update_handle.id} accepted.")
# result = await update_handle.result() # Wait for completion later
Go
package main
import (
"context"
"log"
"time"
"go.temporal.io/sdk/client"
"your_module/app" // Assuming app defines SetLanguageUpdate string constant
)
func main() {
c, _ := client.Dial(client.Options{})
defer c.Close()
workflowID := "workflow-id-123"
// Execute update and wait for completion
var previousLang string
updateValue, err := c.UpdateWorkflow(context.Background(), workflowID, "", app.SetLanguageUpdate, "spanish")
if err != nil {
log.Fatalln("Unable to execute update", err)
}
err = updateValue.Get(&previousLang)
if err != nil {
log.Fatalln("Unable to get update result", err)
}
log.Println("Update completed, previous language:", previousLang)
// Or, start update and get handle to wait for acceptance
// updateHandle, err := c.UpdateWorkflow(context.Background(), client.UpdateWorkflowOptions{
// WorkflowID: workflowID,
// UpdateName: app.SetLanguageUpdate,
// WaitForStage: client.WorkflowUpdateStageAccepted,
// Args: []interface{}{"french"},
// })
// if err != nil { log.Fatalln("Unable to start update", err) }
// log.Println("Update accepted:", updateHandle.UpdateID())
// var result string
// err = updateHandle.Get(context.Background(), &result) // Wait for completion later
}
Java
package your.application;
import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import your.application.workflows.YourWorkflow; // Assume defines setLanguage(Language lang)
import your.application.workflows.Language;
public class Updater {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
String workflowId = "workflow-id-123";
YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, workflowId);
// Execute update and wait for completion
try {
Language previousLanguage = workflow.setLanguage(Language.SPANISH);
System.out.println("Update completed, previous language: " + previousLanguage);
} catch (WorkflowUpdateException e) {
System.err.println("Update failed: " + e.getMessage());
}
// Or, start update and get handle to wait for acceptance
// WorkflowUpdateHandle handle =
// WorkflowStub.fromTyped(workflow)
// .startUpdate(
// "setLanguage", WorkflowUpdateStage.ACCEPTED, Language.class, Language.FRENCH);
// System.out.println("Update " + handle.getId() + " accepted.");
// Language result = handle.getResultAsync().get(); // Wait for completion later
}
}
.NET
using Temporalio.Client;
using System;
using System.Threading.Tasks;
using YourApplication.Workflows; // Assume YourWorkflow defines Task SetCurrentLanguageAsync(Language lang)
public class WorkflowUpdater
{
public static async Task UpdateWorkflowAsync()
{
var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
var handle = client.GetWorkflowHandle("workflow-id-123");
// Execute update and wait for completion
var previousLanguage = await handle.ExecuteUpdateAsync(
wf => wf.SetCurrentLanguageAsync(Language.Spanish));
Console.WriteLine($"Update completed, previous language: {previousLanguage}");
// Or, start update and get handle to wait for acceptance
// var updateHandle = await handle.StartUpdateAsync(
// wf => wf.SetCurrentLanguageAsync(Language.French),
// new(waitForStage: WorkflowUpdateStage.Accepted));
// Console.WriteLine($"Update {updateHandle.Id} accepted.");
// var result = await updateHandle.GetResultAsync(); // Wait for completion later
}
}
PHP
<?php declare(strict_types=1);
namespace App\Client;
use Temporal\Client\WorkflowClientInterface;
use App\Workflow\YourWorkflowInterface; // Assume defines setLanguage(string $lang): string
/** @var WorkflowClientInterface $workflowClient */
$workflowStub = $workflowClient->newRunningWorkflowStub(YourWorkflowInterface::class, "workflow-id-123");
// Execute update and wait for completion
try {
$previousLanguage = $workflowStub->setLanguage('Spanish');
echo "Update completed, previous language: " . $previousLanguage . "\n";
} catch (\Throwable $e) {
echo "Update failed: " . $e->getMessage() . "\n";
}
// Or, start update and get handle to wait for acceptance
// use Temporal\Client\Update\LifecycleStage;
// $handle = $workflowStub->startUpdate('setLanguage', 'French'); // Default stage is ACCEPTED
// echo "Update " . $handle->getId() . " accepted.\n";
// $result = $handle->getResult(); // Wait for completion later
?>
Update-with-Start
Similar to Signal-with-Start, Update-with-Start allows a Client to send an Update to a Workflow, starting the Workflow Execution if it's not already running.
If the Workflow exists, the Update is processed normally. If it doesn't exist, a new Workflow Execution is started with the given ID,
and the Update is delivered and processed *before* the main Workflow logic begins execution.
Note: Update-with-Start is in Public Preview (as of Server v1.26+) and requires specific SDK Client methods.
TypeScript
import { Client, WorkflowIdConflictPolicy } from '@temporalio/client';
import { yourWorkflow, setLanguage } from './workflows'; // Assuming Update definition
async function updateWithStartClient() {
const client = new Client();
const workflowId = 'uws-workflow-' + Date.now();
// Execute update-with-start and wait for completion
const handle = await client.workflow.executeUpdateWithStart(
yourWorkflow, // Workflow function
{
taskQueue: 'my-task-queue',
workflowId: workflowId,
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
args: [/* initial workflow args */],
},
setLanguage, // Update definition
[Language.ENGLISH] // Update args
);
console.log(`UpdateWithStart completed for ${handle.workflowId}, result: ${await handle.result()}`);
// Or, start update-with-start and get handle
// const updateHandle = await client.workflow.startUpdateWithStart(
// yourWorkflow, { /* options */ }, setLanguage, [Language.FRENCH]
// );
// const result = await updateHandle.result();
}
Python
Use execute_update_with_start_workflow or start_update_with_start_workflow on the Client.
import asyncio
from temporalio.client import Client, WorkflowUpdateFailedError
from temporalio.common import WorkflowIDConflictPolicy
from temporalio.client import WithStartWorkflowOperation
# Assuming YourWorkflow with run() method and update handler 'add_item'
# from your_workflow import YourWorkflow, ShoppingCartItem
async def update_with_start_workflow_py():
client = await Client.connect("localhost:7233")
workflow_id = "uws-workflow-py"
task_queue = "my-task-queue"
item_id = "product-123"
quantity = 1
# Define the operation to start the workflow if it doesn't exist
start_op = WithStartWorkflowOperation(
"YourWorkflow.run", # Reference to the workflow run method (or string name)
# Initial args for the workflow if started
id=workflow_id,
task_queue=task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
)
try:
# Execute update-with-start and wait for completion
update_result = await client.execute_update_with_start_workflow(
"YourWorkflow.add_item", # Reference to update handler (or string name)
ShoppingCartItem(sku=item_id, quantity=quantity), # Update args
start_workflow_operation=start_op,
)
print(f"UpdateWithStart completed, result: {update_result}")
# Or, start update-with-start and get handle
# update_handle = await client.start_update_with_start_workflow(
# "YourWorkflow.add_item",
# ShoppingCartItem(sku=item_id, quantity=quantity),
# start_workflow_operation=start_op,
# wait_for_stage=client.WorkflowUpdateStage.ACCEPTED
# )
# print(f"UpdateWithStart {update_handle.id} accepted.")
# final_result = await update_handle.result()
except WorkflowUpdateFailedError as e:
print(f"UpdateWithStart failed: {e}")
except Exception as e:
print(f"An error occurred: {e}")
# Placeholder class definitions for the example
class ShoppingCartItem:
def __init__(self, sku, quantity):
self.sku = sku
self.quantity = quantity
# Example usage:
# asyncio.run(update_with_start_workflow_py())
package your.application;
import io.temporal.client.*;
import io.temporal.serviceclient.WorkflowServiceStubs;
import your.application.workflows.YourWorkflow;
import your.application.workflows.Language;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import java.time.Duration;
public class UpdateWithStarter {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
String workflowId = "uws-workflow-java";
String taskQueue = "my-task-queue";
WorkflowOptions startOptions = WorkflowOptions.newBuilder()
.setTaskQueue(taskQueue)
.setWorkflowId(workflowId)
// Required: Specify how to handle ID conflicts
.setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY) // Or other appropriate policy
.build();
// Create a stub just for specifying the target workflow type and options
YourWorkflow workflowStub = client.newWorkflowStub(YourWorkflow.class, startOptions);
UpdateOptions updateOptions = UpdateOptions.newBuilder()
.setUpdateName("setLanguage")
.setResultClass(Language.class)
.setWaitForStage(WorkflowUpdateStage.COMPLETED) // Wait for completion
.setFirstExecutionRunId(null) // Cannot specify RunID for UWS
.build();
// Define the start operation (workflow method and initial args)
WithStartWorkflowOperation startOperation =
new WithStartWorkflowOperation<>(workflowStub::execute /*, initial args... */);
try {
// Execute Update-with-Start and wait for result
Language previousLanguage = client.executeUpdateWithStart(
workflowStub::setLanguage, // Update method reference
updateOptions,
startOperation,
Language.SPANISH // Update arguments
);
System.out.println("UpdateWithStart completed, previous language: " + previousLanguage);
// Or use startUpdateWithStart to get a handle
// WorkflowUpdateHandle handle = client.startUpdateWithStart(
// workflowStub::setLanguage, updateOptions, startOperation, Language.FRENCH
// );
// Language result = handle.getResultAsync().get();
} catch (Exception e) {
System.err.println("UpdateWithStart failed: " + e);
}
}
}
.NET
using Temporalio.Client;
using Temporalio.Enums;
using System;
using System.Threading.Tasks;
using YourApplication.Workflows;
public class UpdateWithStartClient
{
public static async Task UpdateWithStartAsync()
{
var client = await TemporalClient.ConnectAsync(new("localhost:7233"));
string workflowId = "uws-workflow-dotnet";
string taskQueue = "my-task-queue";
var startArgs = new StartWorkflowArgs { /* initial workflow args */ };
// Define the start operation (workflow run method and args)
var startOperation = new WithStartWorkflowOperation(
wf => wf.RunAsync(startArgs) // Reference to the WorkflowRun method
);
// Execute update-with-start and wait for completion
var previousLanguage = await client.ExecuteUpdateWithStartAsync(
workflowId,
taskQueue,
(YourWorkflow wf) => wf.SetCurrentLanguageAsync(Language.Spanish), // Update lambda
startOperation, // Contains workflow lambda and args
new() {
WorkflowIdConflictPolicy = WorkflowIdConflictPolicy.UseExisting,
// Update options can be added here inside new WorkflowUpdateOptions()
}
);
Console.WriteLine($"UpdateWithStart completed, previous language: {previousLanguage}");
// Or, start update-with-start and get handle
// var updateHandle = await client.StartUpdateWithStartAsync(
// workflowId, taskQueue,
// (YourWorkflow wf) => wf.SetCurrentLanguageAsync(Language.French),
// startOperation,
// new() { WorkflowIdConflictPolicy = WorkflowIdConflictPolicy.UseExisting }
// );
// var result = await updateHandle.GetResultAsync();
}
}
public class StartWorkflowArgs { /* Define args if needed */ }
PHP
PHP SDK does not currently have a dedicated Update-with-Start API. A similar pattern to the Python example (check existence first) would be needed.
<?php
// Conceptual Example - Check existence first
use Temporal\Client\WorkflowClientInterface;
use Temporal\Client\WorkflowStubInterface;
use Temporal\Exception\Client\WorkflowNotFoundException;
/** @var WorkflowClientInterface $workflowClient */
$workflowId = 'uws-workflow-php';
$taskQueue = 'my-task-queue';
$updateArg = 'Spanish';
$startArgs = [/* initial args */];
try {
// Try to get a stub for a running workflow
$workflowStub = $workflowClient->newRunningWorkflowStub(YourWorkflowInterface::class, $workflowId);
echo "Workflow {$workflowId} exists. Sending update.\n";
$result = $workflowStub->setLanguage($updateArg);
echo "Update result: {$result}\n";
} catch (WorkflowNotFoundException $e) {
echo "Workflow {$workflowId} not found. Starting and updating.\n";
// Note: This doesn't guarantee atomicity like true Update-with-Start
$workflowStub = $workflowClient->newWorkflowStub(
YourWorkflowInterface::class,
WorkflowOptions::new()
->withTaskQueue($taskQueue)
->withWorkflowId($workflowId)
// Use appropriate ID reuse policy
);
$run = $workflowClient->start($workflowStub, ...$startArgs);
// Potentially wait briefly for workflow to initialize
sleep(1);
$result = $workflowStub->setLanguage($updateArg);
echo "Started workflow " . $run->getExecution()->id . ", update result: {$result}\n";
}
?>
Advanced Concepts
Activity Heartbeating
For long-running Activities, Heartbeating allows the Activity to report its progress and liveness back to
the Temporal Cluster. This serves several purposes:
Liveness Detection: If the Cluster doesn't receive a heartbeat within the configured
HeartbeatTimeout, it assumes the Activity Worker has crashed and reschedules the Activity (respecting
the Retry Policy).
Progress Checkpointing: Activities can include details (payload) with the heartbeat,
effectively checkpointing their progress. If the Activity is retried (on the same or different
Worker), it can retrieve the last checkpointed details from the Activity context and resume from that
point, avoiding redundant work.
Cancellation Check: The heartbeat mechanism also allows the Activity to check if a
cancellation has been requested for the Workflow or the Activity itself. The Activity can then perform
cleanup and exit gracefully.
Heartbeating is crucial for Activities that might run longer than the StartToCloseTimeout or for tasks where
reporting progress is beneficial. It's implemented by calling a specific SDK function (for example, activity.RecordHeartbeat(ctx, details...)) periodically within the Activity code.
# Pseudocode for Activity with Heartbeating
@activity.defn
async def long_running_task(input_data):
progress = load_progress_from_details(activity.info().heartbeat_details)
if progress is None:
progress = 0
total_items = len(input_data)
for i in range(progress, total_items):
# Check for cancellation before/during work
activity.heartbeat(i) # Report progress (item 'i' is about to be processed)
# Process item i...
await asyncio.sleep(1) # Simulate work
# Optional: Heartbeat again after processing item, if fine-grained needed
# activity.heartbeat(i + 1)
return "All items processed"
def load_progress_from_details(details):
# Logic to interpret details sent with previous heartbeat
if details: return details # Assuming details itself is the progress marker
return None
Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.
Complete Activities Asynchronously
In some scenarios, the process that completes an Activity might be different from the one that started it.
For example, an Activity might trigger an external system (via API call) and rely on a webhook callback to
signal completion.
To handle this:
The Activity obtains its unique TaskToken from the Activity Context. The Task Token identifies this
specific Activity execution attempt.
The Activity returns control to the Worker *without* completing, typically by throwing/returning a
special indicator (like activity.DoNotCompleteOnReturn or a specific exception type).
The Activity passes the TaskToken to the external system or process that will eventually signal
completion.
When the external process finishes, it uses a Temporal Client method (like CompleteActivityById or
CompleteActivity, providing the Task Token) to report the result or failure back to the Temporal
Cluster.
The Worker that originally executed the Activity does not need to be running for the completion call to
succeed. The Temporal Cluster uses the Task Token to correlate the completion call with the correct
Workflow Execution.
# Pseudocode for Activity triggering async completion
@activity.defn
async def trigger_external_job(job_params):
info = activity.info()
task_token_str = info.task_token.decode('utf-8') # Get task token as string
# Call external system, passing task_token_str as a callback identifier
external_api.start_job(job_params, callback_token=task_token_str)
# Tell the Worker not to complete this Activity yet
activity.do_not_complete_on_return()
# --- Elsewhere, in a separate process (e.g., webhook handler) ---
def handle_external_job_completion(callback_token, result, error):
client = connect_to_temporal() # Connect with a Temporal client
task_token_bytes = callback_token.encode('utf-8')
try:
if error:
client.report_activity_failure(task_token_bytes, error)
else:
client.report_activity_completion(task_token_bytes, result)
except Exception as e:
# Handle potential errors reporting completion (e.g., activity timed out already)
log.error(f"Failed to report activity completion/failure for token {callback_token}: {e}")
Activity Cancellation
Activities can be cancelled in two main ways:
Workflow Cancellation: If the Workflow Execution that invoked the Activity is cancelled,
the Temporal Cluster propagates the cancellation request to running Activities (depending on the
Workflow's cancellation handling logic and SDK Cancellation Scopes).
Activity Timeouts: If an Activity exceeds its StartToCloseTimeout or ScheduleToCloseTimeout,
it is effectively cancelled by the timeout mechanism, and a failure is reported to the Workflow (which
may trigger retries).
Activities that need to perform cleanup when cancelled (for example, releasing a lock, rolling back a transaction)
should check for cancellation requests. This is often done:
Implicitly during activity.RecordHeartbeat calls.
Explicitly using SDK context cancellation checks (for example, ctx.Err() in Go, checking Activity.Current.CancellationToken.IsCancellationRequested in .NET, checking activity.is_cancelled() in Python).
Upon detecting cancellation, the Activity should perform its cleanup and then typically throw a specific
cancelled exception type provided by the SDK.
Workflow Versioning (Patching)
Since Workflow code must be deterministic, deploying updated Workflow code that changes the sequence of
commands (like adding a new Activity call, removing a Timer) can break replays of existing, in-flight
Workflow Executions that were started with the old code.
The Workflow.getVersion API (or equivalent in each SDK) allows you to safely deploy changes to
Workflow definitions without breaking compatibility.
Wrap the new or modified logic within a getVersion block, assigning it a unique change ID (often an
integer or descriptive string).
Specify a default version (typically associated with the original code path) and the new version
(associated with the changed code path).
When a Worker executes this code during a replay: If the history already contains a marker for this
change ID, the SDK automatically executes the code path corresponding to the recorded version.
When a Worker executes this code for the *first time* (not a replay for this specific part), it
executes the code associated with the *new* version you specified and records that version marker in the
history.
This ensures that existing executions continue deterministically along their original path, while new
executions (or executions reaching this point for the first time) use the updated logic.
# Pseudocode for using getVersion
# --- Original Code ---
# result = await workflow.execute_activity(activity_A, ...)
# --- Updated Code ---
# We want to optionally call activity_B before activity_A
# Change ID identifies this specific patch/change
patch_change_id = "add-activity-B-call-v1"
# Default version (original behavior) is -1 (or some SDK-specific default)
# New version is 1 (or the next sequential number for this change ID)
version = workflow.get_version(patch_change_id, default=-1, max_supported=1)
if version == 1:
# New code path: Call B first
await workflow.execute_activity(activity_B, ...)
# Code common to both versions (or the original path if version == -1)
result = await workflow.execute_activity(activity_A, ...)
After all in-flight Workflows using the old code path (default version) have completed, you can remove
the getVersion call and the old code path, simplifying the Workflow definition.
Note: Build ID based Worker Versioning is a newer, often preferred approach for managing incompatible Worker code deployments, especially across multiple Task Queues or complex rollouts. Patching is useful for targeted, small changes within a single Workflow definition.
Execute Child Workflows
Workflows can start other Workflows, known as Child Workflows. This allows for modular design, breaking
down complex business processes into smaller, reusable, and independently testable units.
Key characteristics of Child Workflows:
Independent History: Each Child Workflow has its own separate event history.
Parent Relationship: The parent Workflow can track the status of the child and receive
its result upon completion.
Cancellation Propagation: Cancellation can be configured to propagate from parent to
child (or not).
Retries and Timeouts: Child Workflows have their own timeouts and retry policies,
independent of the parent.
Task Queues: Child Workflows can run on the same or different Task Queues as the parent.
Starting a Child Workflow is similar to starting a Workflow via the Client, but it's done from within the
parent Workflow's code using specific SDK functions (for example, workflow.ExecuteChildWorkflow). Options
control the parent-child relationship behavior (for example, what happens if the parent completes before the
child).
TypeScript
import { executeChild, proxyActivities, ParentClosePolicy, workflowInfo } from '@temporalio/workflow'; // Import workflowInfo
import { childWorkflow } from './child'; // Assume childWorkflow definition exists
export async function parentWorkflow(name: string): Promise<string> {
// Start child and wait for result
const result = await executeChild(childWorkflow, {
args: [name],
workflowId: `child-${workflowInfo().workflowId}`, // Ensure unique child ID using workflowInfo
// Default: ParentClosePolicy.TERMINATE (terminates child if parent closes)
// Use ABANDON if child should continue after parent finishes
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
});
return `Parent received: ${result}`;
}
Python
from temporalio import workflow
from temporalio.common import ParentClosePolicy
# Assume child_workflow is defined in another file/module
with workflow.unsafe.imports_passed_through():
from child_workflows import ChildWorkflow
@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self, name: str) -> str:
result = await workflow.execute_child_workflow(
ChildWorkflow.run,
f"Input for {name}",
id=f"child-{workflow.info().workflow_id}",
parent_close_policy=ParentClosePolicy.ABANDON,
)
return f"Parent received: {result}"
Go
package app // Assuming parent workflow is in app package
import (
"fmt" // For string formatting
"go.temporal.io/sdk/workflow"
"go.temporal.io/api/enums/v1"
// Assume app defines ChildWorkflow function
"your_module/app"
)
func ParentWorkflow(ctx workflow.Context, name string) (string, error) {
cwo := workflow.ChildWorkflowOptions{
// Generate a unique ID, e.g., combining parent ID and a unique part
WorkflowID: fmt.Sprintf("child-%s-%s", workflow.GetInfo(ctx).WorkflowExecution.ID, workflow.GetRandomUUID(ctx)),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, cwo)
var result string
// Assuming ChildWorkflow is defined in the same package or imported correctly
future := workflow.ExecuteChildWorkflow(ctx, app.ChildWorkflow, name)
if err := future.Get(ctx, &result); err != nil {
// Handle child workflow error (e.g., log, return error)
workflow.GetLogger(ctx).Error("Child workflow failed", "Error", err)
return "", err
}
return "Parent received: " + result, nil
}
// Assume ChildWorkflow definition exists, e.g.:
// func ChildWorkflow(ctx workflow.Context, inputName string) (string, error) {
// workflow.GetLogger(ctx).Info("Child workflow started", "Input", inputName)
// return "Result from child for " + inputName, nil
// }
Java
package your.application.workflows; // Use your actual package name
import io.temporal.workflow.*;
import io.temporal.api.enums.v1.ParentClosePolicy;
import java.util.UUID; // For UUIDs
// Assume ChildWorkflow interface exists in the same package or imported
public class ParentWorkflowImpl implements ParentWorkflow { // Assuming ParentWorkflow interface
@Override
public String execute(String name) {
ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
.setWorkflowId("child-" + Workflow.getInfo().getWorkflowId() + "-" + UUID.randomUUID().toString()) // Ensure unique ID
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.build();
ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);
// Start child and wait for result
String result = child.execute("Input for " + name);
return "Parent received: " + result;
// To start asynchronously:
// Promise promise = Async.function(child::execute, "Input for " + name);
// Use promise.get() later
}
}
.NET
using Temporalio.Workflows;
using Temporalio.Common; // For ParentClosePolicy
using System; // For Guid
using System.Threading.Tasks;
using YourApplication.Workflows; // Assuming ChildWorkflow class exists in the same namespace or imported
[Workflow]
public class ParentWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync(string name)
{
string result = await Workflow.ExecuteChildWorkflowAsync(
(ChildWorkflow wf) => wf.RunChildAsync($"Input for {name}"), // Ensure RunChildAsync exists on ChildWorkflow
new ChildWorkflowOptions
{
WorkflowId = $"child-{Workflow.Info.WorkflowId}-{Guid.NewGuid()}", // Ensure unique ID
ParentClosePolicy = ParentClosePolicy.Abandon,
}
);
return $"Parent received: {result}";
}
}
PHP
<?php declare(strict_types=1);\\\n\\\nnamespace App\\\\\\\\Workflow;\\\\n\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowInterface;\\\\nuse Temporal\\\\\\\\Workflow\\\\\\\\ParentWorkflowMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityOptions;\\\\nuse Temporal\\\\\\\\Common\\\\\\\\RetryOptions;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityMethod;\\\\nuse Temporal\\\\\\\\Activity\\\\\\\\ActivityInterface;\\\\nuse App\\\\\\\\ChildWorkflows\\\\\\\\ChildWorkflow;\\\\nuse Ramsey\\\\Uuid\\\\Uuid; // Example: Use ramsey/uuid\\\\n\\\\n// Define ParentWorkflow interface and implementation\\\\ninterface ParentWorkflowInterface extends ParentWorkflowInterface<?> {\\\n @ParentWorkflowMethod(name = "ParentWorkflow")\\\n void execute(String name);\n}\\\n\\\\nclass ParentWorkflowImpl implements ParentWorkflowInterface {\\\n @Override\\\n @ParentWorkflowMethod(name = "ParentWorkflow")\\\n public void execute(String name) {\\\n // Start child workflow\\\n ChildWorkflow.start(\\\"child-workflow\\\", [\\\"name\\\" => $name]);\\\\\n // You can also use a different workflow ID if needed\\\\\n }\\\\\n}\\\n?>
Continue-As-New
Workflow Executions have limits on their event history size and duration. For long-running processes
or Workflows that accumulate a large history (for example, loops processing many items), `Continue-As-New` allows
a Workflow to effectively restart itself with a fresh history, carrying over necessary state.
When a Workflow decides to `Continue-As-New`:
It specifies the same Workflow Type (or potentially a different one).
It provides the input arguments for the *new* execution. This is how state is carried over.
The current execution is marked as `ContinuedAsNew`.
A new Workflow Execution starts immediately with the provided arguments and a clean history, but
logically continues the business process. The Run ID changes, but the Workflow ID can be kept the same.
This is the standard pattern for implementing infinite loops or processes that need to run indefinitely
without hitting history limits. It's also useful for periodic cleanup of large state within a Workflow.
# Pseudocode for Continue-As-New loop
@workflow.defn
class PerpetualWorkflow:
@workflow.run
async def run(self, counter: int, other_state: dict = None): # Example with state
if other_state is None:
other_state = {} # Initialize state if first run
workflow.logger.info(f"Executing run {counter} with state: {other_state}")
# Example condition to ContinueAsNew (e.g., every 1000 iterations)
if counter > 0 and counter % 1000 == 0:
workflow.logger.info(f"Processed {counter} items, continuing as new...")
# Carry over the necessary state to the new execution
# Only include state needed for the *next* run
workflow.continue_as_new(counter, other_state) # Restarts run() with new args
# Note: Code after continue_as_new does not execute in the current run
# --- Regular Workflow Logic ---
workflow.logger.info(f"Processing item {counter}")
# Do work... update other_state if needed
# await workflow.execute_activity(process_item, counter, other_state, ...)
other_state['last_processed'] = counter
await workflow.sleep(timedelta(seconds=1)) # Simulate work or delay
# "Loop" by calling continue_as_new with updated state for the next iteration
workflow.continue_as_new(counter + 1, other_state)
# Client side starting the workflow for the first time:
async def run(self, counter: int):
if counter >= 1000: # Example condition to ContinueAsNew
print(f"Processed {counter} items, continuing as new...")
# Carry over the counter state to the new execution
workflow.continue_as_new(counter) # Restarts run() with counter
else:
print(f"Processing item {counter}")
# Do work...
await workflow.execute_activity(process_item, counter, ...)
# Schedule next iteration after a delay
await workflow.sleep(timedelta(seconds=10))
# "Loop" by calling continue_as_new with updated state
workflow.continue_as_new(counter + 1)
Side Effects
While Workflows must be deterministic, sometimes you need to execute a short, non-deterministic piece of
code within the Workflow logic itself (without calling an Activity). A common example is generating a UUID
or getting a random number using a non-SDK function (which would violate determinism if called directly).
Workflow.SideEffect (or equivalent) allows this.
You provide a function/lambda containing the non-deterministic code to SideEffect.
The first time this code path is executed, the function is run, and its return value is recorded in the
Workflow history.
During replays, the function is *not* re-executed. Instead, the recorded value from the history is
returned directly, preserving determinism.
Side Effects should only be used for short, simple operations where creating a dedicated Activity would be
overkill. They are not suitable for I/O or long-running computations. For example, generating a unique ID or
calling a potentially flaky function that you don't want to retry as a full Activity.
# Pseudocode for Side Effect
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self):
# Generate a random ID using a potentially non-deterministic library function
# This lambda is only executed once per execution path.
random_id = await workflow.side_effect(lambda: generate_random_id_maybe_impure())
# Use the random_id, which is now deterministic for replays
await workflow.execute_activity(use_id_activity, random_id, ...)
return f"Processed with ID: {random_id}"
# Note: For truly random values needed within workflow logic,
# prefer the SDK's deterministic random functions like workflow.random().
# SideEffect is more for integrating short non-deterministic external code.
Local Activities
Local Activities are an optimization for short-lived Activities that don't require the full overhead
of Task Queue polling, scheduling, and retries managed by the Temporal Cluster. They execute directly
within the Worker's Workflow Task processing thread.
Characteristics:
Execution: Run in the same process as the Workflow code that called them.
Latency: Much lower invocation latency compared to standard Activities (sub-millisecond).
Retries: Limited, server-side retries are not applicable. Retries must be handled by the
Workflow code itself or rely on the Workflow Task retry mechanism. A short, configurable local retry policy might be available in the SDK.
Timeouts: Subject to shorter, locally configured timeouts, often tied to the Workflow
Task timeout.
Use Cases: Ideal for simple functions like data validation, transformation, short calculations,
or accessing read-only resources local to the Worker, where the overhead of a standard Activity is
undesirable. For example, validating input against a schema loaded in Worker memory.
Determinism: Like standard Activities, Local Activities themselves do not need to be
deterministic, but their *invocation* from the Workflow must be.
Executing a Local Activity uses a specific SDK function (for example, workflow.ExecuteLocalActivity), often
requiring different, shorter timeout configurations than standard Activities.
Mutations
The Workflow.upsertMemo, Workflow.upsertSearchAttributes APIs allow Workflows to update their own Memo and
Search Attributes during execution. These updates are recorded in the event history.
Workflow.deprecatePatch is used in conjunction with Workflow.getVersion for managing code cleanup. Once you are
sure no running Workflows are using an old code path identified by a specific change ID (patch ID), you
can call `deprecatePatch(patch_change_id)`. This signals to the system (and potentially linting tools)
that this patch ID is obsolete and its corresponding `getVersion` block can eventually be removed from the
code.
Testing
Testing Temporal applications is crucial for ensuring correctness and reliability. Temporal provides
several mechanisms and recommendations for testing different aspects of your application, from individual
Activities and Workflows in isolation to their integrated behavior.
Approaches to Testing
Key testing strategies include:
Unit Testing: Testing individual Workflow or Activity functions/methods in
isolation, mocking their dependencies (like Activity calls within a Workflow, or external services
called by an Activity). This focuses on the logic within the component itself.
Workflow Replay Testing: Verifying that Workflow code remains deterministic after
changes. The SDK uses saved Workflow histories to replay the Workflow logic and ensure it produces
the exact same commands. This is vital for safely deploying updates to Workflow definitions.
Integration Testing (Test Environment): Testing the interaction between Workflows,
Activities, and Workers using a test environment provided by the SDK. This environment simulates
the Temporal Cluster, allowing you to run Workflows and Activities locally, testing their
interaction and behavior without needing a full Temporal Cluster. This is the most common way to
test the end-to-end logic of your Temporal application.
End-to-End Testing: Testing the complete application flow, including client
invocations, Worker execution, and interactions with real external services (though often mocked
or stubbed), potentially against a real Temporal Cluster (like the development server or a staging
cluster).
Workflow Replay Testing
Workflow replay testing ensures that changes to Workflow code do not break determinism for existing,
in-flight Workflow Executions. Temporal SDKs typically provide tools or mechanisms to facilitate this.
The process generally involves:
Obtaining historical event data (Workflow History) for completed Workflow Executions (often saved
as JSON files). You can download these using the Temporal CLI or SDK utilities.
Running a test suite that feeds these histories into the updated Workflow code via the SDK's
replay functionality.
The replay mechanism executes the Workflow logic against the history. If the updated code produces
a different sequence of commands (like trying to schedule a different Activity, using a different
timer duration, etc.) than what is recorded in the history, the replay fails, indicating a
non-deterministic change.
Regularly running replay tests against a corpus of relevant histories is a best practice before
deploying Workflow code changes.
Integration Testing with the Test Environment
Most Temporal SDKs provide a test environment or framework (e.g., `TestWorkflowEnvironment` in Java/Go,
`WorkflowEnvironment.start_time_skipping` in Python, `TestWorkflowRuntime` in .NET, test utilities in
TypeScript) that simulates the Temporal Cluster in-process or locally. This allows you to:
Register Workflow and Activity implementations.
Execute Workflows and Activities directly within your tests.
Mock Activities called by Workflows.
Control time within the environment (time skipping) to quickly test timeouts, timers, and long-running
processes without waiting in real time.
Assert Workflow completion status, results, or errors.
Verify Activity invocations and parameters.
This is the primary method for testing the integrated logic of your Workflows and Activities.