1
2import { v4 as uuid } from 'uuid';
3import type {
4 SagaState, SagaStep, SagaStore, RetryPolicy,
5} from './types';
6import { SagaExecutionError, CompensationError } from './types';
7
8export class SagaOrchestrator<TContext> {
9 constructor(
10 private readonly steps: SagaStep<TContext>[],
11 private readonly store: SagaStore,
12 private readonly logger: Logger = console,
13 ) {}
14
15 async execute(sagaType: string, context: TContext): Promise<string> {
16 const state: SagaState<TContext> = {
17 id: uuid(),
18 sagaType,
19 status: 'running',
20 currentStep: 0,
21 context,
22 completedSteps: [],
23 version: 1,
24 createdAt: new Date(),
25 updatedAt: new Date(),
26 };
27
28 await this.store.save(state);
29 this.logger.info('Saga started', { sagaId: state.id, sagaType, steps: this.steps.length });
30
31 await this.executeSteps(state);
32 return state.id;
33 }
34
35 async resume(sagaId: string): Promise<void> {
36 const state = await this.store.get<TContext>(sagaId);
37 if (!state) throw new Error(`Saga ${sagaId} not found`);
38 if (state.status !== 'running') throw new Error(`Saga ${sagaId} is ${state.status}, cannot resume`);
39
40 this.logger.info('Saga resumed', { sagaId, step: state.currentStep });
41 await this.executeSteps(state);
42 }
43
44 private async executeSteps(state: SagaState<TContext>): Promise<void> {
45 for (let i = state.currentStep; i < this.steps.length; i++) {
46 const step = this.steps[i];
47
48 this.logger.info('Executing step', { sagaId: state.id, step: step.name, index: i });
49
50 try {
51 await this.executeWithRetry(step, state.context);
52
53 state.currentStep = i + 1;
54 state.completedSteps.push(step.name);
55 state.updatedAt = new Date();
56 await this.store.saveWithOptimisticLock(state);
57
58 } catch (error) {
59 this.logger.error('Step failed', { sagaId: state.id, step: step.name, error });
60
61 state.status = 'compensating';
62 state.error = (error as Error).message;
63 state.updatedAt = new Date();
64 await this.store.saveWithOptimisticLock(state);
65
66 await this.compensate(state);
67
68 throw new SagaExecutionError(
69 `Saga failed at step ${step.name}`,
70 state.id,
71 step.name,
72 error as Error,
73 );
74 }
75 }
76
77 state.status = 'completed';
78 state.updatedAt = new Date();
79 await this.store.saveWithOptimisticLock(state);
80 this.logger.info('Saga completed', { sagaId: state.id });
81 }
82
83 private async executeWithRetry(step: SagaStep<TContext>, context: TContext): Promise<void> {
84 const policy: RetryPolicy = step.retryPolicy ?? {
85 maxAttempts: 1,
86 initialBackoffMs: 100,
87 multiplier: 2,
88 maxBackoffMs: 5000,
89 };
90
91 let lastError: Error | null = null;
92
93 for (let attempt = 0; attempt < policy.maxAttempts; attempt++) {
94 if (attempt > 0) {
95 const backoff = Math.min(
96 policy.initialBackoffMs * Math.pow(policy.multiplier, attempt - 1),
97 policy.maxBackoffMs,
98 );
99 await sleep(backoff);
100 }
101
102 try {
103 if (step.timeout) {
104 await withTimeout(step.execute(context), step.timeout);
105 } else {
106 await step.execute(context);
107 }
108 return;
109 } catch (error) {
110 lastError = error as Error;
111
112 if (policy.retryableErrors && !policy.retryableErrors.includes(lastError.name)) {
113 throw lastError;
114 }
115
116 this.logger.warn('Step attempt failed', {
117 step: step.name,
118 attempt: attempt + 1,
119 maxAttempts: policy.maxAttempts,
120 error: lastError.message,
121 });
122 }
123 }
124
125 throw lastError!;
126 }
127
128 private async compensate(state: SagaState<TContext>): Promise<void> {
129 const failures: Array<{ step: string; error: Error }> = [];
130
131 for (let i = state.completedSteps.length - 1; i >= 0; i--) {
132 const stepName = state.completedSteps[i];
133 const step = this.steps.find(s => s.name === stepName);
134 if (!step) continue;
135
136 this.logger.info('Compensating step', { sagaId: state.id, step: stepName });
137
138 try {
139 await withTimeout(step.compensate(state.context), 30000);
140 } catch (error) {
141 this.logger.error('Compensation failed', { sagaId: state.id, step: stepName, error });
142 failures.push({ step: stepName, error: error as Error });
143 }
144 }
145
146 if (failures.length > 0) {
147 state.status = 'failed';
148 state.error += `; compensation failures: ${failures.map(f => f.step).join(', ')}`;
149 } else {
150 state.status = 'failed';
151 }
152 state.updatedAt = new Date();
153 await this.store.saveWithOptimisticLock(state);
154
155 if (failures.length > 0) {
156 throw new CompensationError(
157 `Compensation failed for ${failures.length} steps`,
158 state.id,
159 failures,
160 );
161 }
162 }
163}
164
165function sleep(ms: number): Promise<void> {
166 return new Promise(resolve => setTimeout(resolve, ms));
167}
168
169function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
170 return Promise.race([
171 promise,
172 new Promise<never>((_, reject) =>
173 setTimeout(() => reject(new Error(`Step timed out after ${ms}ms`)), ms)
174 ),
175 ]);
176}
177
178interface Logger {
179 info(message: string, meta?: Record<string, unknown>): void;
180 warn(message: string, meta?: Record<string, unknown>): void;
181 error(message: string, meta?: Record<string, unknown>): void;
182}
183