Essential techniques and rules for authoring production-ready Temporal workflows and activities. This guide covers non-obvious patterns, common pitfalls, and best practices that engineers should know but might forget.
Workflows must be deterministic—they must produce the same sequence of commands when replayed from history. Violations cause non-determinism errors and workflow failures.
func MyWorkflow(ctx workflow.Context) error {
// ❌ NEVER DO THIS
if time.Now().Hour() < 12 {
// ...
}
id := uuid.New() // ❌ Non-deterministic
return nil
}
func MyWorkflow(ctx workflow.Context) error {
// ✅ Use workflow.Now()
now := workflow.Now(ctx)
if now.Hour() < 12 {
// ...
}
// ✅ Use workflow.UUID()
id := workflow.UUID()
// ✅ Use workflow.Random() for random numbers
random := workflow.Random(ctx)
return nil
}
public class MyWorkflow implements WorkflowInterface {
public void execute() {
// ❌ NEVER DO THIS
if (LocalDateTime.now().getHour() < 12) {
// ...
}
String id = UUID.randomUUID().toString(); // ❌ Non-deterministic
}
}
public class MyWorkflow implements WorkflowInterface {
public void execute() {
// ✅ Use Workflow.currentTimeMillis()
long now = Workflow.currentTimeMillis();
if (Instant.ofEpochMilli(now).atZone(ZoneId.systemDefault())
.getHour() < 12) {
// ...
}
// ✅ Use Workflow.randomUUID()
String id = Workflow.randomUUID().toString();
// ✅ Use Workflow.newRandom() for random numbers
Random random = Workflow.newRandom();
}
}
from datetime import datetime
import uuid
import random
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
# ❌ NEVER DO THIS
if datetime.now().hour < 12:
pass
id = str(uuid.uuid4()) # ❌ Non-deterministic
num = random.randint(1, 100) # ❌ Non-deterministic
from temporalio import workflow
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
# ✅ Use workflow.now()
now = workflow.now()
if now.hour < 12:
pass
# ✅ Use workflow.uuid4()
id = str(workflow.uuid4())
# ✅ Use workflow.random() for random numbers
num = workflow.random().randint(1, 100)
export async function myWorkflow(): Promise {
// ❌ NEVER DO THIS
if (new Date().getHours() < 12) {
// ...
}
const id = crypto.randomUUID(); // ❌ Non-deterministic
const num = Math.random(); // ❌ Non-deterministic
}
import * as wf from '@temporalio/workflow';
export async function myWorkflow(): Promise {
// ✅ Use wf.now()
const now = wf.now();
if (now.getHours() < 12) {
// ...
}
// ✅ Use wf.uuid4()
const id = wf.uuid4();
// ✅ Use wf.random() for random numbers
const num = wf.random().nextInt(1, 100);
}
public class MyWorkflow : WorkflowBase
{
public override Task RunAsync()
{
// ❌ NEVER DO THIS
if (DateTime.Now.Hour < 12)
{
// ...
}
var id = Guid.NewGuid(); // ❌ Non-deterministic
var num = Random.Shared.Next(); // ❌ Non-deterministic
return Task.CompletedTask;
}
}
using Temporalio.Workflows;
public class MyWorkflow : WorkflowBase
{
public override Task RunAsync()
{
// ✅ Use Workflow.UtcNow
var now = Workflow.UtcNow;
if (now.Hour < 12)
{
// ...
}
// ✅ Use Workflow.NewGuid()
var id = Workflow.NewGuid();
// ✅ Use Workflow.Random for random numbers
var num = Workflow.Random.Next(1, 100);
return Task.CompletedTask;
}
}
Heartbeating is required for cancellation detection and strongly recommended for any activity longer than a few seconds.
Activities must explicitly call heartbeat functions. The SDK does not automatically heartbeat.
import (
"context"
"time"
"go.temporal.io/sdk/activity"
)
func LongRunningActivity(ctx context.Context, input string) (string, error) {
for i := 0; i < 100; i++ {
// ✅ Heartbeat with progress details
activity.RecordHeartbeat(ctx, i, "processing item")
// Do work
time.Sleep(1 * time.Second)
// ✅ Check for cancellation
if ctx.Err() != nil {
return "", ctx.Err()
}
}
return "done", nil
}
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
public class Activities {
public String longRunningActivity(String input) {
ActivityExecutionContext context = Activity.getExecutionContext();
for (int i = 0; i < 100; i++) {
try {
// ✅ Heartbeat with progress details
context.heartbeat(i, "processing item");
// Do work
Thread.sleep(1000);
} catch (ActivityCompletionException e) {
// ✅ Cancellation detected via heartbeat
throw e;
}
}
return "done";
}
}
from temporalio import activity
import asyncio
@activity.defn
async def long_running_activity(input: str) -> str:
for i in range(100):
try:
# ✅ Heartbeat with progress details
activity.heartbeat(i, "processing item")
# Do work
await asyncio.sleep(1)
except CancelledError:
# ✅ Cancellation detected
raise
return "done"
import { Context, CancelledFailure } from '@temporalio/activity';
export async function longRunningActivity(input: string): Promise {
const ctx = Context.current();
for (let i = 0; i < 100; i++) {
// ✅ Heartbeat with progress details
ctx.heartbeat(i, 'processing item');
// ✅ Check for cancellation
if (ctx.cancellationSignal.aborted) {
throw new CancelledFailure('Activity cancelled');
}
// Do work
await sleep(1000);
}
return 'done';
}
using Temporalio.Activities;
public class Activities
{
[Activity]
public async Task LongRunningActivityAsync(string input)
{
var context = ActivityExecutionContext.Current;
for (int i = 0; i < 100; i++)
{
// ✅ Heartbeat with progress details
context.Heartbeat(i, "processing item");
// ✅ Check for cancellation
context.CancellationToken.ThrowIfCancellationRequested();
// Do work
await Task.Delay(1000, context.CancellationToken);
}
return "done";
}
}
Use heartbeat details to checkpoint progress and resume on retry.
func ProcessItemsActivity(ctx context.Context, items []string) error {
var lastProcessed int
// ✅ Check for previous progress
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &lastProcessed); err == nil {
// Resume from checkpoint
items = items[lastProcessed:]
}
}
for i, item := range items {
// Process item
processItem(item)
// ✅ Save progress
activity.RecordHeartbeat(ctx, lastProcessed+i+1)
}
return nil
}
public void processItemsActivity(List items) {
ActivityExecutionContext context = Activity.getExecutionContext();
int lastProcessed = 0;
// ✅ Check for previous progress
if (context.getHeartbeatDetails().isPresent()) {
lastProcessed = context.getHeartbeatDetails().get();
items = items.subList(lastProcessed, items.size());
}
for (int i = 0; i < items.size(); i++) {
// Process item
processItem(items.get(i));
// ✅ Save progress
context.heartbeat(lastProcessed + i + 1);
}
}
@activity.defn
async def process_items_activity(items: list[str]) -> None:
last_processed = 0
# ✅ Check for previous progress
heartbeat_details = activity.info().heartbeat_details
if heartbeat_details:
last_processed = heartbeat_details[0]
items = items[last_processed:]
for i, item in enumerate(items):
# Process item
process_item(item)
# ✅ Save progress
activity.heartbeat(last_processed + i + 1)
export async function processItemsActivity(items: string[]): Promise {
const ctx = Context.current();
let lastProcessed = 0;
// ✅ Check for previous progress
const heartbeatDetails = ctx.info.heartbeatDetails;
if (heartbeatDetails && heartbeatDetails.length > 0) {
lastProcessed = heartbeatDetails[0] as number;
items = items.slice(lastProcessed);
}
for (let i = 0; i < items.length; i++) {
// Process item
processItem(items[i]);
// ✅ Save progress
ctx.heartbeat(lastProcessed + i + 1);
}
}
[Activity]
public async Task ProcessItemsActivityAsync(List items)
{
var context = ActivityExecutionContext.Current;
int lastProcessed = 0;
// ✅ Check for previous progress
if (context.HeartbeatDetails.HasValue)
{
lastProcessed = context.HeartbeatDetails.Value.GetValue();
items = items.Skip(lastProcessed).ToList();
}
for (int i = 0; i < items.Count; i++)
{
// Process item
ProcessItem(items[i]);
// ✅ Save progress
context.Heartbeat(lastProcessed + i + 1);
}
}
func MyActivity(ctx context.Context) error {
for {
select {
case <-time.After(1 * time.Second):
activity.RecordHeartbeat(ctx)
case <-ctx.Done():
// ❌ WRONG: Returning nil means "failed", not "cancelled"
return nil
}
}
}
import "go.temporal.io/sdk/temporal"
func MyActivity(ctx context.Context) error {
for {
select {
case <-time.After(1 * time.Second):
activity.RecordHeartbeat(ctx)
case <-ctx.Done():
// ✅ CORRECT: Return the error to mark as cancelled
return temporal.NewCanceledError("activity cancelled")
}
}
}
public String myActivity() {
ActivityExecutionContext context = Activity.getExecutionContext();
try {
for (int i = 0; i < 100; i++) {
context.heartbeat(i);
Thread.sleep(1000);
}
} catch (ActivityCompletionException e) {
// ❌ WRONG: Swallowing means "failed", not "cancelled"
return "done anyway";
}
return "done";
}
public String myActivity() {
ActivityExecutionContext context = Activity.getExecutionContext();
try {
for (int i = 0; i < 100; i++) {
context.heartbeat(i);
Thread.sleep(1000);
}
} catch (ActivityCompletionException e) {
// ✅ CORRECT: Re-throw to mark as cancelled
throw e;
}
return "done";
}
@activity.defn
async def my_activity() -> str:
try:
for i in range(100):
activity.heartbeat(i)
await asyncio.sleep(1)
except CancelledError:
# ❌ WRONG: Swallowing means "failed", not "cancelled"
return "done anyway"
return "done"
from temporalio.exceptions import CancelledError
@activity.defn
async def my_activity() -> str:
try:
for i in range(100):
activity.heartbeat(i)
await asyncio.sleep(1)
except CancelledError:
# ✅ CORRECT: Re-raise to mark as cancelled
raise
return "done"
export async function myActivity(): Promise {
const ctx = Context.current();
try {
for (let i = 0; i < 100; i++) {
ctx.heartbeat(i);
await sleep(1000);
}
} catch (err) {
if (err instanceof CancelledFailure) {
// ❌ WRONG: Swallowing means "failed", not "cancelled"
return 'done anyway';
}
throw err;
}
return 'done';
}
import { CancelledFailure } from '@temporalio/activity';
export async function myActivity(): Promise {
const ctx = Context.current();
try {
for (let i = 0; i < 100; i++) {
ctx.heartbeat(i);
await sleep(1000);
}
} catch (err) {
if (err instanceof CancelledFailure) {
// ✅ CORRECT: Re-throw to mark as cancelled
throw err;
}
throw err;
}
return 'done';
}
[Activity]
public async Task MyActivityAsync()
{
var context = ActivityExecutionContext.Current;
try
{
for (int i = 0; i < 100; i++)
{
context.Heartbeat(i);
await Task.Delay(1000);
}
}
catch (OperationCanceledException)
{
// ❌ WRONG: Swallowing means "failed", not "cancelled"
return "done anyway";
}
return "done";
}
[Activity]
public async Task MyActivityAsync()
{
var context = ActivityExecutionContext.Current;
try
{
for (int i = 0; i < 100; i++)
{
context.Heartbeat(i);
await Task.Delay(1000, context.CancellationToken);
}
}
catch (OperationCanceledException)
{
// ✅ CORRECT: Re-throw to mark as cancelled
throw;
}
return "done";
}
Proper timeout configuration is critical for production reliability. Each timeout serves a different purpose.
| Timeout Type | Scope | What It Controls |
|---|---|---|
| ScheduleToClose | Activity | Total time from scheduling to completion (includes retries) |
| StartToClose | Activity | Time for a single attempt execution |
| Heartbeat | Activity | Maximum time between heartbeats before timeout |
| Execution | Workflow | Total workflow execution time |
| Run | Workflow | Time for a single workflow run (before ContinueAsNew) |
| Task | Workflow | Time for worker to process a workflow task |
HeartbeatTimeout < StartToCloseTimeout < ScheduleToCloseTimeout
ao := workflow.ActivityOptions{
// ✅ ScheduleToClose: Total time including retries
ScheduleToCloseTimeout: 10 * time.Minute,
// ✅ StartToClose: Time for single attempt
StartToCloseTimeout: 2 * time.Minute,
// ✅ Heartbeat: Max time between heartbeats
HeartbeatTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 1 * time.Minute,
MaximumAttempts: 5,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
ActivityOptions options = ActivityOptions.newBuilder()
// ✅ ScheduleToClose: Total time including retries
.setScheduleToCloseTimeout(Duration.ofMinutes(10))
// ✅ StartToClose: Time for single attempt
.setStartToCloseTimeout(Duration.ofMinutes(2))
// ✅ Heartbeat: Max time between heartbeats
.setHeartbeatTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setBackoffCoefficient(2.0)
.setMaximumInterval(Duration.ofMinutes(1))
.setMaximumAttempts(5)
.build())
.build();
from temporalio import workflow
from temporalio.common import RetryPolicy
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
# ✅ Configure timeouts
await workflow.execute_activity(
my_activity,
arg="value",
start_to_close_timeout=timedelta(minutes=2),
schedule_to_close_timeout=timedelta(minutes=10),
heartbeat_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(minutes=1),
maximum_attempts=5,
),
)
import { proxyActivities, sleep } from '@temporalio/workflow';
import type * as activities from './activities';
const { myActivity } = proxyActivities({
// ✅ ScheduleToClose: Total time including retries
scheduleToCloseTimeout: '10m',
// ✅ StartToClose: Time for single attempt
startToCloseTimeout: '2m',
// ✅ Heartbeat: Max time between heartbeats
heartbeatTimeout: '30s',
retry: {
initialInterval: '1s',
backoffCoefficient: 2,
maximumInterval: '1m',
maximumAttempts: 5,
},
});
var options = new ActivityOptions
{
// ✅ ScheduleToClose: Total time including retries
ScheduleToCloseTimeout = TimeSpan.FromMinutes(10),
// ✅ StartToClose: Time for single attempt
StartToCloseTimeout = TimeSpan.FromMinutes(2),
// ✅ Heartbeat: Max time between heartbeats
HeartbeatTimeout = TimeSpan.FromSeconds(30),
RetryPolicy = new RetryPolicy
{
InitialInterval = TimeSpan.FromSeconds(1),
BackoffCoefficient = 2.0,
MaximumInterval = TimeSpan.FromMinutes(1),
MaximumAttempts = 5,
},
};
Error handling differs significantly between workflows and activities. Understanding this is crucial.
Activities can return any error. The workflow receives it and decides how to handle it.
func MyWorkflow(ctx workflow.Context) error {
err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
if err != nil {
// ❌ WRONG: This fails the workflow task, not the workflow
return err
}
return nil
}
import "go.temporal.io/sdk/temporal"
func MyWorkflow(ctx workflow.Context) error {
err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
if err != nil {
// ✅ CORRECT: Fail the workflow execution
return temporal.NewApplicationError(
"activity failed",
"ActivityError",
err,
)
}
return nil
}
public class MyWorkflow implements WorkflowInterface {
public void execute() {
try {
String result = Workflow.executeActivity(
Activities::myActivity,
String.class
).get();
} catch (Exception e) {
// ❌ WRONG: This fails the workflow task, not the workflow
throw new RuntimeException(e);
}
}
}
import io.temporal.failure.ApplicationFailure;
public class MyWorkflow implements WorkflowInterface {
public void execute() {
try {
String result = Workflow.executeActivity(
Activities::myActivity,
String.class
).get();
} catch (Exception e) {
// ✅ CORRECT: Fail the workflow execution
throw ApplicationFailure.newNonRetryableFailure(
"activity failed",
"ActivityError",
e
);
}
}
}
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
try:
result = await workflow.execute_activity(my_activity)
except Exception as e:
# ❌ WRONG: This fails the workflow task, not the workflow
raise
from temporalio.exceptions import ApplicationError
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
try:
result = await workflow.execute_activity(my_activity)
except Exception as e:
# ✅ CORRECT: Fail the workflow execution
raise ApplicationError(
"activity failed",
"ActivityError",
non_retryable=True
) from e
export async function myWorkflow(): Promise {
try {
const result = await myActivity();
} catch (err) {
// ❌ WRONG: This fails the workflow task, not the workflow
throw err;
}
}
import { ApplicationFailure } from '@temporalio/workflow';
export async function myWorkflow(): Promise {
try {
const result = await myActivity();
} catch (err) {
// ✅ CORRECT: Fail the workflow execution
throw ApplicationFailure.nonRetryable(
'activity failed',
'ActivityError',
err
);
}
}
public class MyWorkflow : WorkflowBase
{
public override async Task RunAsync()
{
try
{
var result = await Workflow.ExecuteActivityAsync(
(Activities a) => a.MyActivityAsync()
);
}
catch (Exception e)
{
// ❌ WRONG: This fails the workflow task, not the workflow
throw;
}
}
}
using Temporalio.Exceptions;
public class MyWorkflow : WorkflowBase
{
public override async Task RunAsync()
{
try
{
var result = await Workflow.ExecuteActivityAsync(
(Activities a) => a.MyActivityAsync()
);
}
catch (Exception e)
{
// ✅ CORRECT: Fail the workflow execution
throw new ApplicationFailureException(
"activity failed",
"ActivityError",
e,
nonRetryable: true
);
}
}
}
When deploying workflow code changes, use versioning to prevent non-determinism errors for running workflows.
GetVersion (or equivalent) when making changes that affect the command sequence, such as adding/removing activities, changing activity order, or modifying workflow logic.
func MyWorkflow(ctx workflow.Context) error {
// ✅ Get version for code changes
version := workflow.GetVersion(ctx, "change-id", workflow.DefaultVersion, 2)
if version == workflow.DefaultVersion {
// Old code path
err := workflow.ExecuteActivity(ctx, OldActivity).Get(ctx, nil)
return err
} else {
// New code path
err := workflow.ExecuteActivity(ctx, NewActivity).Get(ctx, nil)
return err
}
}
public class MyWorkflow implements WorkflowInterface {
public void execute() {
// ✅ Get version for code changes
int version = Workflow.getVersion("change-id",
Workflow.DEFAULT_VERSION, 2);
if (version == Workflow.DEFAULT_VERSION) {
// Old code path
Workflow.executeActivity(Activities::oldActivity, String.class)
.get();
} else {
// New code path
Workflow.executeActivity(Activities::newActivity, String.class)
.get();
}
}
}
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
# ✅ Get version for code changes
version = workflow.get_version("change-id", 1, 2)
if version == 1:
# Old code path
await workflow.execute_activity(old_activity)
else:
# New code path
await workflow.execute_activity(new_activity)
import { patched } from '@temporalio/workflow';
export async function myWorkflow(): Promise {
// ✅ Use patched() for code changes
if (patched('change-id')) {
// New code path
await newActivity();
} else {
// Old code path
await oldActivity();
}
}
public class MyWorkflow : WorkflowBase
{
public override async Task RunAsync()
{
// ✅ Get version for code changes
var version = await Workflow.GetVersionAsync("change-id", 1, 2);
if (version == 1)
{
// Old code path
await Workflow.ExecuteActivityAsync(
(Activities a) => a.OldActivityAsync()
);
}
else
{
// New code path
await Workflow.ExecuteActivityAsync(
(Activities a) => a.NewActivityAsync()
);
}
}
}
These changes are safe and don't require versioning:
Use ContinueAsNew for long-running or perpetual workflows to prevent history size limits and performance issues.
func PerpetualWorkflow(ctx workflow.Context, state WorkflowState) error {
// Process some work
for i := 0; i < 1000; i++ {
err := workflow.ExecuteActivity(ctx, ProcessItem, i).Get(ctx, nil)
if err != nil {
return err
}
}
// ✅ Update state
state.ProcessedCount += 1000
// ✅ Continue as new with updated state
return workflow.NewContinueAsNewError(ctx, PerpetualWorkflow, state)
}
public class PerpetualWorkflow implements WorkflowInterface {
public void execute(WorkflowState state) {
// Process some work
for (int i = 0; i < 1000; i++) {
Workflow.executeActivity(
Activities::processItem,
i,
String.class
).get();
}
// ✅ Update state
state.setProcessedCount(state.getProcessedCount() + 1000);
// ✅ Continue as new with updated state
Workflow.continueAsNew(state);
}
}
@workflow.defn
class PerpetualWorkflow:
@workflow.run
async def run(self, state: WorkflowState) -> None:
# Process some work
for i in range(1000):
await workflow.execute_activity(process_item, i)
# ✅ Update state
state.processed_count += 1000
# ✅ Continue as new with updated state
workflow.continue_as_new(state)
export async function perpetualWorkflow(
state: WorkflowState
): Promise {
// Process some work
for (let i = 0; i < 1000; i++) {
await processItem(i);
}
// ✅ Update state
state.processedCount += 1000;
// ✅ Continue as new with updated state
await continueAsNew(state);
}
public class PerpetualWorkflow : WorkflowBase
{
public override async Task RunAsync(WorkflowState state)
{
// Process some work
for (int i = 0; i < 1000; i++)
{
await Workflow.ExecuteActivityAsync(
(Activities a) => a.ProcessItemAsync(i)
);
}
// ✅ Update state
state.ProcessedCount += 1000;
// ✅ Continue as new with updated state
Workflow.ContinueAsNew(state);
}
}
Activities are retried on failure. They must be idempotent or use idempotency keys to prevent duplicate operations.
func ProcessPaymentActivity(ctx context.Context, payment Payment) error {
// ✅ Use idempotency key from activity info
activityInfo := activity.GetInfo(ctx)
idempotencyKey := activityInfo.ActivityID
// ✅ Check if already processed
if isPaymentProcessed(idempotencyKey) {
return nil // Already done
}
// Process payment
err := chargeCard(payment)
if err != nil {
return err
}
// ✅ Mark as processed
markPaymentProcessed(idempotencyKey)
return nil
}
public void processPaymentActivity(Payment payment) {
// ✅ Use idempotency key from activity info
ActivityExecutionContext context = Activity.getExecutionContext();
String idempotencyKey = context.getInfo().getActivityId();
// ✅ Check if already processed
if (isPaymentProcessed(idempotencyKey)) {
return; // Already done
}
// Process payment
chargeCard(payment);
// ✅ Mark as processed
markPaymentProcessed(idempotencyKey);
}
@activity.defn
async def process_payment_activity(payment: Payment) -> None:
# ✅ Use idempotency key from activity info
activity_info = activity.info()
idempotency_key = activity_info.activity_id
# ✅ Check if already processed
if is_payment_processed(idempotency_key):
return # Already done
# Process payment
charge_card(payment)
# ✅ Mark as processed
mark_payment_processed(idempotency_key)
export async function processPaymentActivity(
payment: Payment
): Promise {
// ✅ Use idempotency key from activity info
const ctx = Context.current();
const idempotencyKey = ctx.info.activityId;
// ✅ Check if already processed
if (isPaymentProcessed(idempotencyKey)) {
return; // Already done
}
// Process payment
chargeCard(payment);
// ✅ Mark as processed
markPaymentProcessed(idempotencyKey);
}
[Activity]
public async Task ProcessPaymentActivityAsync(Payment payment)
{
// ✅ Use idempotency key from activity info
var context = ActivityExecutionContext.Current;
var idempotencyKey = context.Info.ActivityId;
// ✅ Check if already processed
if (IsPaymentProcessed(idempotencyKey))
{
return; // Already done
}
// Process payment
ChargeCard(payment);
// ✅ Mark as processed
MarkPaymentProcessed(idempotencyKey);
}
public override async Task RunAsync()
{
// ❌ NEVER DO THIS
await Task.Run(() => DoWork());
// ❌ NEVER DO THIS
await SomeAsyncMethod().ConfigureAwait(false);
}
public override async Task RunAsync()
{
// ✅ Use Workflow.RunTaskAsync
await Workflow.RunTaskAsync(() => DoWork());
// ✅ Use ConfigureAwait(true) or omit it
await SomeAsyncMethod().ConfigureAwait(true);
}
Activity options are inherited from the workflow context. Set them once at the workflow level or override per-activity.
| Feature | Regular Activity | Local Activity |
|---|---|---|
| Execution | Separate worker task | Same workflow worker |
| Latency | Higher (task queue round-trip) | Lower (in-process) |
| Retries | Server-managed | Workflow-managed |
| Use Case | Long-running, external calls | Short, fast operations |
Always test workflow replay to catch non-determinism issues before production.
Use time-skipping test environments to test timeout scenarios without waiting.
Verify that activities properly handle cancellation and mark themselves as cancelled (not failed).