Adding Workflows
How to add Temporal workflows and activities with determinism constraints.
Temporal workflows provide durable, fault-tolerant execution for long-running operations. This guide covers how to add new workflows and activities to the worker.
Workflow vs Activity
| Concern | Workflows | Activities |
|---|---|---|
| Purpose | Orchestration logic | Side effects |
| Determinism | Required | Not required |
| I/O | Forbidden | Allowed |
| Replay safe | Yes | N/A |
| Location | apps/worker/src/workflows.ts | apps/worker/src/activities.ts |
Adding an Activity
Activities are normal async functions registered with the worker. They handle all side effects: database queries, API calls, file I/O:
// apps/worker/src/activities.ts
export const activities = {
processTask: async (taskId: string) => {
// Database queries and external calls are fine here
const result = await db.query(...)
return { alreadyProcessed: false, taskId }
},
sendNotification: async (userId: string, message: string) => {
await notificationService.send(userId, message)
}
}Activities should be idempotent when possible. If an activity is retried due to a worker crash, it should produce the same result or detect the duplicate.
Adding a Workflow
Workflows orchestrate activities and must be fully deterministic:
// apps/worker/src/workflows.ts
import { proxyActivities } from '@temporalio/workflow'
import type { activities } from './activities.js'
const { processTask, sendNotification } = proxyActivities<typeof activities>({
startToCloseTimeout: '30s'
})
export async function taskProcessingWorkflow(taskId: string): Promise<void> {
const result = await processTask(taskId)
if (!result.alreadyProcessed) {
await sendNotification(result.userId, `Task ${taskId} processed`)
}
}Determinism Rules
The following are strictly forbidden inside workflow code:
// FORBIDDEN in workflows:
Date.now() // Use workflow.now()
new Date() // Use workflow.now()
Math.random() // Use deterministic alternatives
setTimeout(fn, ms) // Use workflow.sleep(ms)
setInterval(fn, ms) // Not applicable
clearTimeout(id) // Not applicable
clearInterval(id) // Not applicable
import { db } from '...' // No infrastructure imports
import { fetch } from '...' // No I/O importsThese constraints exist because Temporal replays workflow code from event history. Any non-deterministic operation produces different results on replay, causing the workflow to fail.
Safe Alternatives
| Forbidden | Use Instead |
|---|---|
Date.now() | Temporal's workflow.now() |
Math.random() | Pass random values as activity results |
setTimeout | workflow.sleep('5s') |
| Side effects | Move to activities |
Registering with the Worker
Activities are passed to the worker at creation time:
const worker = await Worker.create({
connection,
namespace: env.TEMPORAL_NAMESPACE,
taskQueue: env.TEMPORAL_TASK_QUEUE,
workflowsPath: workflowSourcePath,
activities
})New workflows are automatically discovered from the workflowsPath file.
Integration Testing
Test activities directly by calling them as functions. The worker integration suite validates idempotent behavior:
// First call processes the task
const first = await activities.processTask(taskId)
expect(first.alreadyProcessed).toBe(false)
// Second call detects the duplicate
const second = await activities.processTask(taskId)
expect(second.alreadyProcessed).toBe(true)