Back to Journal
System Design

Complete Guide to CQRS & Event Sourcing with Typescript

A comprehensive guide to implementing CQRS & Event Sourcing using Typescript, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 16 min read

TypeScript's structural type system, union types, and first-class async support make it particularly well-suited for CQRS and Event Sourcing implementations. Discriminated unions provide compile-time safety for event handling, and the Node.js ecosystem offers excellent options for event stores and message brokers. This guide covers building a production-grade CQRS/ES system in TypeScript from the ground up.

Core Type Definitions

TypeScript's type system shines when modeling domain events. Discriminated unions ensure exhaustive event handling at compile time.

typescript
1// Base event metadata
2interface EventMetadata {
3 correlationId: string;
4 causationId: string;
5 userId: string;
6 timestamp: string;
7}
8 
9// Discriminated union of all order events
10type OrderEvent =
11 | OrderPlaced
12 | OrderConfirmed
13 | OrderCancelled
14 | OrderShipped;
15 
16interface OrderPlaced {
17 type: 'OrderPlaced';
18 version: 1;
19 aggregateId: string;
20 data: {
21 customerId: string;
22 lineItems: LineItem[];
23 totalAmountCents: number;
24 currency: string;
25 shippingAddress: Address;
26 };
27 metadata: EventMetadata;
28}
29 
30interface OrderConfirmed {
31 type: 'OrderConfirmed';
32 version: 1;
33 aggregateId: string;
34 data: {
35 confirmedBy: string;
36 };
37 metadata: EventMetadata;
38}
39 
40interface OrderCancelled {
41 type: 'OrderCancelled';
42 version: 1;
43 aggregateId: string;
44 data: {
45 reason: string;
46 cancelledBy: string;
47 };
48 metadata: EventMetadata;
49}
50 
51interface OrderShipped {
52 type: 'OrderShipped';
53 version: 1;
54 aggregateId: string;
55 data: {
56 trackingNumber: string;
57 carrier: string;
58 estimatedDelivery: string;
59 };
60 metadata: EventMetadata;
61}
62 
63interface LineItem {
64 productId: string;
65 quantity: number;
66 unitPriceCents: number;
67}
68 
69interface Address {
70 street: string;
71 city: string;
72 state: string;
73 country: string;
74 postalCode: string;
75}
76 

The discriminated union on the type field lets TypeScript narrow event types in switch statements, catching missing handlers at compile time.

Event Store Implementation

A PostgreSQL-backed event store with optimistic concurrency control and JSONB storage for event payloads.

typescript
1import { Pool, PoolClient } from 'pg';
2 
3interface StoredEvent {
4 globalPosition: number;
5 aggregateId: string;
6 aggregateType: string;
7 eventType: string;
8 version: number;
9 payload: unknown;
10 metadata: EventMetadata;
11 createdAt: Date;
12}
13 
14class PostgresEventStore {
15 constructor(private pool: Pool) {}
16 
17 async append(
18 aggregateId: string,
19 aggregateType: string,
20 expectedVersion: number,
21 events: OrderEvent[]
22 ): Promise<void> {
23 const client = await this.pool.connect();
24 try {
25 await client.query('BEGIN');
26 
27 // Optimistic concurrency check
28 const { rows } = await client.query(
29 `SELECT COALESCE(MAX(version), 0) as current_version
30 FROM events WHERE aggregate_id = $1`,
31 [aggregateId]
32 );
33 
34 const currentVersion = rows[0].current_version;
35 if (currentVersion !== expectedVersion) {
36 throw new ConcurrencyError(
37 `Expected version ${expectedVersion}, found ${currentVersion}`
38 );
39 }
40 
41 for (let i = 0; i < events.length; i++) {
42 const event = events[i];
43 await client.query(
44 `INSERT INTO events (aggregate_id, aggregate_type, event_type, version, payload, metadata, created_at)
45 VALUES ($1, $2, $3, $4, $5, $6, NOW())`,
46 [
47 aggregateId,
48 aggregateType,
49 event.type,
50 expectedVersion + i + 1,
51 JSON.stringify(event.data),
52 JSON.stringify(event.metadata),
53 ]
54 );
55 }
56 
57 // Notify listeners via PostgreSQL NOTIFY
58 await client.query(
59 `SELECT pg_notify('domain_events', $1)`,
60 [JSON.stringify({ aggregateId, count: events.length })]
61 );
62 
63 await client.query('COMMIT');
64 } catch (error) {
65 await client.query('ROLLBACK');
66 throw error;
67 } finally {
68 client.release();
69 }
70 }
71 
72 async loadEvents(
73 aggregateId: string,
74 afterVersion: number = 0
75 ): Promise<StoredEvent[]> {
76 const { rows } = await this.pool.query(
77 `SELECT global_position, aggregate_id, aggregate_type, event_type,
78 version, payload, metadata, created_at
79 FROM events
80 WHERE aggregate_id = $1 AND version > $2
81 ORDER BY version ASC`,
82 [aggregateId, afterVersion]
83 );
84 return rows.map(this.mapRow);
85 }
86 
87 async readAllEvents(
88 afterPosition: number = 0,
89 batchSize: number = 100
90 ): Promise<StoredEvent[]> {
91 const { rows } = await this.pool.query(
92 `SELECT global_position, aggregate_id, aggregate_type, event_type,
93 version, payload, metadata, created_at
94 FROM events
95 WHERE global_position > $1
96 ORDER BY global_position ASC
97 LIMIT $2`,
98 [afterPosition, batchSize]
99 );
100 return rows.map(this.mapRow);
101 }
102 
103 private mapRow(row: any): StoredEvent {
104 return {
105 globalPosition: row.global_position,
106 aggregateId: row.aggregate_id,
107 aggregateType: row.aggregate_type,
108 eventType: row.event_type,
109 version: row.version,
110 payload: row.payload,
111 metadata: row.metadata,
112 createdAt: row.created_at,
113 };
114 }
115}
116 
117class ConcurrencyError extends Error {
118 constructor(message: string) {
119 super(message);
120 this.name = 'ConcurrencyError';
121 }
122}
123 

Aggregate Implementation

The aggregate encapsulates domain logic using the evolve pattern — a pure function that produces new state from current state and an event.

typescript
1type OrderStatus = 'draft' | 'placed' | 'confirmed' | 'cancelled' | 'shipped';
2 
3interface OrderState {
4 id: string;
5 status: OrderStatus;
6 customerId: string | null;
7 lineItems: LineItem[];
8 totalAmountCents: number;
9 currency: string | null;
10 version: number;
11}
12 
13const initialState = (id: string): OrderState => ({
14 id,
15 status: 'draft',
16 customerId: null,
17 lineItems: [],
18 totalAmountCents: 0,
19 currency: null,
20 version: 0,
21});
22 
23// Pure state evolution function
24function evolve(state: OrderState, event: OrderEvent): OrderState {
25 switch (event.type) {
26 case 'OrderPlaced':
27 return {
28 ...state,
29 status: 'placed',
30 customerId: event.data.customerId,
31 lineItems: event.data.lineItems,
32 totalAmountCents: event.data.totalAmountCents,
33 currency: event.data.currency,
34 version: state.version + 1,
35 };
36 case 'OrderConfirmed':
37 return { ...state, status: 'confirmed', version: state.version + 1 };
38 case 'OrderCancelled':
39 return { ...state, status: 'cancelled', version: state.version + 1 };
40 case 'OrderShipped':
41 return { ...state, status: 'shipped', version: state.version + 1 };
42 }
43}
44 
45class OrderAggregate {
46 private state: OrderState;
47 private uncommittedEvents: OrderEvent[] = [];
48 
49 constructor(id: string) {
50 this.state = initialState(id);
51 }
52 
53 static rehydrate(id: string, events: StoredEvent[]): OrderAggregate {
54 const aggregate = new OrderAggregate(id);
55 for (const stored of events) {
56 const event = aggregate.deserializeEvent(stored);
57 aggregate.state = evolve(aggregate.state, event);
58 }
59 return aggregate;
60 }
61 
62 place(
63 customerId: string,
64 lineItems: LineItem[],
65 currency: string,
66 metadata: EventMetadata
67 ): void {
68 if (this.state.status !== 'draft') {
69 throw new Error(`Cannot place order in status: ${this.state.status}`);
70 }
71 if (lineItems.length === 0) {
72 throw new Error('Order must have at least one item');
73 }
74 
75 const totalAmountCents = lineItems.reduce(
76 (sum, item) => sum + item.unitPriceCents * item.quantity,
77 0
78 );
79 
80 this.emit({
81 type: 'OrderPlaced',
82 version: 1,
83 aggregateId: this.state.id,
84 data: {
85 customerId,
86 lineItems,
87 totalAmountCents,
88 currency,
89 shippingAddress: { street: '', city: '', state: '', country: '', postalCode: '' },
90 },
91 metadata,
92 });
93 }
94 
95 confirm(confirmedBy: string, metadata: EventMetadata): void {
96 if (this.state.status !== 'placed') {
97 throw new Error(`Cannot confirm order in status: ${this.state.status}`);
98 }
99 this.emit({
100 type: 'OrderConfirmed',
101 version: 1,
102 aggregateId: this.state.id,
103 data: { confirmedBy },
104 metadata,
105 });
106 }
107 
108 cancel(reason: string, cancelledBy: string, metadata: EventMetadata): void {
109 if (this.state.status === 'cancelled') {
110 throw new Error('Order already cancelled');
111 }
112 if (this.state.status === 'shipped') {
113 throw new Error('Cannot cancel shipped order');
114 }
115 this.emit({
116 type: 'OrderCancelled',
117 version: 1,
118 aggregateId: this.state.id,
119 data: { reason, cancelledBy },
120 metadata,
121 });
122 }
123 
124 private emit(event: OrderEvent): void {
125 this.state = evolve(this.state, event);
126 this.uncommittedEvents.push(event);
127 }
128 
129 getUncommittedEvents(): OrderEvent[] {
130 return [...this.uncommittedEvents];
131 }
132 
133 getVersion(): number {
134 return this.state.version;
135 }
136 
137 getState(): Readonly<OrderState> {
138 return this.state;
139 }
140 
141 private deserializeEvent(stored: StoredEvent): OrderEvent {
142 return {
143 type: stored.eventType,
144 version: 1,
145 aggregateId: stored.aggregateId,
146 data: stored.payload,
147 metadata: stored.metadata,
148 } as OrderEvent;
149 }
150}
151 

Command Handler

The command handler orchestrates aggregate loading, command execution, and event persistence.

typescript
1interface PlaceOrderCommand {
2 orderId: string;
3 customerId: string;
4 lineItems: LineItem[];
5 currency: string;
6 userId: string;
7 correlationId: string;
8}
9 
10interface ConfirmOrderCommand {
11 orderId: string;
12 confirmedBy: string;
13 userId: string;
14 correlationId: string;
15}
16 
17class OrderCommandHandler {
18 constructor(private eventStore: PostgresEventStore) {}
19 
20 async placeOrder(cmd: PlaceOrderCommand): Promise<void> {
21 const events = await this.eventStore.loadEvents(cmd.orderId);
22 const aggregate = OrderAggregate.rehydrate(cmd.orderId, events);
23 
24 const metadata: EventMetadata = {
25 correlationId: cmd.correlationId,
26 causationId: cmd.correlationId,
27 userId: cmd.userId,
28 timestamp: new Date().toISOString(),
29 };
30 
31 aggregate.place(cmd.customerId, cmd.lineItems, cmd.currency, metadata);
32 
33 const uncommitted = aggregate.getUncommittedEvents();
34 const expectedVersion = aggregate.getVersion() - uncommitted.length;
35 
36 await this.eventStore.append(
37 cmd.orderId,
38 'Order',
39 expectedVersion,
40 uncommitted
41 );
42 }
43 
44 async confirmOrder(cmd: ConfirmOrderCommand): Promise<void> {
45 const events = await this.eventStore.loadEvents(cmd.orderId);
46 const aggregate = OrderAggregate.rehydrate(cmd.orderId, events);
47 
48 const metadata: EventMetadata = {
49 correlationId: cmd.correlationId,
50 causationId: cmd.correlationId,
51 userId: cmd.userId,
52 timestamp: new Date().toISOString(),
53 };
54 
55 aggregate.confirm(cmd.confirmedBy, metadata);
56 
57 const uncommitted = aggregate.getUncommittedEvents();
58 const expectedVersion = aggregate.getVersion() - uncommitted.length;
59 
60 await this.eventStore.append(
61 cmd.orderId,
62 'Order',
63 expectedVersion,
64 uncommitted
65 );
66 }
67}
68 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Projection Engine

A polling-based projection engine that processes events from the global stream and updates read models.

typescript
1type EventHandler = (event: StoredEvent) => Promise<void>;
2 
3class ProjectionEngine {
4 private handlers: Map<string, EventHandler[]> = new Map();
5 private checkpoint: number = 0;
6 private running: boolean = false;
7 
8 constructor(
9 private eventStore: PostgresEventStore,
10 private checkpointStore: CheckpointStore,
11 private projectionName: string
12 ) {}
13 
14 on(eventType: string, handler: EventHandler): void {
15 const existing = this.handlers.get(eventType) ?? [];
16 this.handlers.set(eventType, [...existing, handler]);
17 }
18 
19 async start(): Promise<void> {
20 this.checkpoint = await this.checkpointStore.load(this.projectionName);
21 this.running = true;
22 
23 while (this.running) {
24 const events = await this.eventStore.readAllEvents(this.checkpoint, 100);
25 
26 for (const event of events) {
27 const handlers = this.handlers.get(event.eventType) ?? [];
28 for (const handler of handlers) {
29 await handler(event);
30 }
31 this.checkpoint = event.globalPosition;
32 }
33 
34 if (events.length > 0) {
35 await this.checkpointStore.save(this.projectionName, this.checkpoint);
36 }
37 
38 if (events.length < 100) {
39 await sleep(500); // Poll interval when caught up
40 }
41 }
42 }
43 
44 stop(): void {
45 this.running = false;
46 }
47}
48 
49// Order list projection
50class OrderListProjection {
51 constructor(private pool: Pool) {}
52 
53 register(engine: ProjectionEngine): void {
54 engine.on('OrderPlaced', async (event) => {
55 const data = event.payload as OrderPlaced['data'];
56 await this.pool.query(
57 `INSERT INTO order_list_view (order_id, customer_id, status, total_cents, currency, placed_at)
58 VALUES ($1, $2, 'placed', $3, $4, $5)
59 ON CONFLICT (order_id) DO NOTHING`,
60 [event.aggregateId, data.customerId, data.totalAmountCents, data.currency, event.createdAt]
61 );
62 });
63 
64 engine.on('OrderConfirmed', async (event) => {
65 await this.pool.query(
66 `UPDATE order_list_view SET status = 'confirmed', updated_at = NOW() WHERE order_id = $1`,
67 [event.aggregateId]
68 );
69 });
70 
71 engine.on('OrderCancelled', async (event) => {
72 await this.pool.query(
73 `UPDATE order_list_view SET status = 'cancelled', updated_at = NOW() WHERE order_id = $1`,
74 [event.aggregateId]
75 );
76 });
77 }
78}
79 
80// Checkpoint persistence
81class CheckpointStore {
82 constructor(private pool: Pool) {}
83 
84 async load(projectionName: string): Promise<number> {
85 const { rows } = await this.pool.query(
86 'SELECT position FROM projection_checkpoints WHERE name = $1',
87 [projectionName]
88 );
89 return rows[0]?.position ?? 0;
90 }
91 
92 async save(projectionName: string, position: number): Promise<void> {
93 await this.pool.query(
94 `INSERT INTO projection_checkpoints (name, position, updated_at)
95 VALUES ($1, $2, NOW())
96 ON CONFLICT (name) DO UPDATE SET position = $2, updated_at = NOW()`,
97 [projectionName, position]
98 );
99 }
100}
101 
102function sleep(ms: number): Promise<void> {
103 return new Promise((resolve) => setTimeout(resolve, ms));
104}
105 

Query Service

typescript
1interface OrderListView {
2 orderId: string;
3 customerId: string;
4 status: string;
5 totalCents: number;
6 currency: string;
7 placedAt: Date;
8}
9 
10class OrderQueryService {
11 constructor(private pool: Pool) {}
12 
13 async getOrder(orderId: string): Promise<OrderListView | null> {
14 const { rows } = await this.pool.query(
15 'SELECT * FROM order_list_view WHERE order_id = $1',
16 [orderId]
17 );
18 return rows[0] ? this.mapRow(rows[0]) : null;
19 }
20 
21 async listByCustomer(
22 customerId: string,
23 limit: number = 20,
24 offset: number = 0
25 ): Promise<OrderListView[]> {
26 const { rows } = await this.pool.query(
27 `SELECT * FROM order_list_view
28 WHERE customer_id = $1
29 ORDER BY placed_at DESC
30 LIMIT $2 OFFSET $3`,
31 [customerId, limit, offset]
32 );
33 return rows.map(this.mapRow);
34 }
35 
36 private mapRow(row: any): OrderListView {
37 return {
38 orderId: row.order_id,
39 customerId: row.customer_id,
40 status: row.status,
41 totalCents: row.total_cents,
42 currency: row.currency,
43 placedAt: row.placed_at,
44 };
45 }
46}
47 

Testing

typescript
1import { describe, it, expect } from 'vitest';
2 
3describe('OrderAggregate', () => {
4 it('should place an order', () => {
5 const aggregate = new OrderAggregate('order-1');
6 const metadata: EventMetadata = {
7 correlationId: 'corr-1',
8 causationId: 'corr-1',
9 userId: 'user-1',
10 timestamp: new Date().toISOString(),
11 };
12 
13 aggregate.place(
14 'customer-1',
15 [{ productId: 'prod-1', quantity: 2, unitPriceCents: 1999 }],
16 'USD',
17 metadata
18 );
19 
20 const events = aggregate.getUncommittedEvents();
21 expect(events).toHaveLength(1);
22 expect(events[0].type).toBe('OrderPlaced');
23 expect(events[0].data.totalAmountCents).toBe(3998);
24 });
25 
26 it('should reject placing an already placed order', () => {
27 const aggregate = new OrderAggregate('order-1');
28 const metadata: EventMetadata = {
29 correlationId: 'corr-1',
30 causationId: 'corr-1',
31 userId: 'user-1',
32 timestamp: new Date().toISOString(),
33 };
34 
35 aggregate.place(
36 'customer-1',
37 [{ productId: 'prod-1', quantity: 1, unitPriceCents: 1000 }],
38 'USD',
39 metadata
40 );
41 
42 expect(() =>
43 aggregate.place('customer-1', [{ productId: 'prod-2', quantity: 1, unitPriceCents: 500 }], 'USD', metadata)
44 ).toThrow('Cannot place order in status: placed');
45 });
46 
47 it('should rehydrate and allow confirmation', () => {
48 const stored: StoredEvent[] = [{
49 globalPosition: 1,
50 aggregateId: 'order-1',
51 aggregateType: 'Order',
52 eventType: 'OrderPlaced',
53 version: 1,
54 payload: {
55 customerId: 'cust-1',
56 lineItems: [{ productId: 'p1', quantity: 1, unitPriceCents: 1000 }],
57 totalAmountCents: 1000,
58 currency: 'USD',
59 shippingAddress: { street: '', city: '', state: '', country: '', postalCode: '' },
60 },
61 metadata: { correlationId: 'c1', causationId: 'c1', userId: 'u1', timestamp: '' },
62 createdAt: new Date(),
63 }];
64 
65 const aggregate = OrderAggregate.rehydrate('order-1', stored);
66 const metadata: EventMetadata = {
67 correlationId: 'corr-2',
68 causationId: 'corr-2',
69 userId: 'admin',
70 timestamp: new Date().toISOString(),
71 };
72 
73 aggregate.confirm('admin', metadata);
74 
75 expect(aggregate.getUncommittedEvents()).toHaveLength(1);
76 expect(aggregate.getUncommittedEvents()[0].type).toBe('OrderConfirmed');
77 });
78});
79 

Conclusion

TypeScript brings unique advantages to CQRS and Event Sourcing: discriminated unions catch missing event handlers at compile time, the evolve pattern produces naturally pure state transitions, and async/await simplifies projection consumer code. The PostgreSQL-backed event store with NOTIFY provides a solid foundation that scales to tens of thousands of events per second before you need to consider specialized infrastructure.

Build your event types first, validate them with the type system, and let the compiler guide your aggregate and projection implementations. The investment in precise type definitions pays dividends across every component of the system.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026