Back to Journal
System Design

How to Build CQRS & Event Sourcing Using Nestjs

Step-by-step tutorial for building CQRS & Event Sourcing with Nestjs, from project setup through deployment.

Muneer Puthiya Purayil 18 min read

NestJS provides first-class support for CQRS through the @nestjs/cqrs module, making it one of the most streamlined frameworks for building event-sourced applications in the Node.js ecosystem. This tutorial walks through building a complete CQRS and Event Sourcing system with NestJS, from project scaffolding through production-ready event handling, projections, and API endpoints.

Project Setup

Start with a fresh NestJS project and install the required dependencies.

bash
1nest new order-service
2cd order-service
3npm install @nestjs/cqrs @nestjs/typeorm typeorm pg uuid
4npm install -D @types/uuid
5 

Configure TypeORM for PostgreSQL in app.module.ts:

typescript
1import { Module } from '@nestjs/common';
2import { CqrsModule } from '@nestjs/cqrs';
3import { TypeOrmModule } from '@nestjs/typeorm';
4import { OrderModule } from './order/order.module';
5 
6@Module({
7 imports: [
8 TypeOrmModule.forRoot({
9 type: 'postgres',
10 host: process.env.DB_HOST,
11 port: parseInt(process.env.DB_PORT, 10),
12 username: process.env.DB_USER,
13 password: process.env.DB_PASSWORD,
14 database: process.env.DB_NAME,
15 entities: [__dirname + '/**/*.entity{.ts,.js}'],
16 synchronize: false,
17 }),
18 CqrsModule,
19 OrderModule,
20 ],
21})
22export class AppModule {}
23 

Database Schema

Create the events table and read model tables.

sql
1CREATE TABLE domain_events (
2 id BIGSERIAL PRIMARY KEY,
3 aggregate_id UUID NOT NULL,
4 aggregate_type VARCHAR(100) NOT NULL,
5 event_type VARCHAR(100) NOT NULL,
6 version INT NOT NULL,
7 payload JSONB NOT NULL,
8 metadata JSONB NOT NULL DEFAULT '{}',
9 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
10 UNIQUE (aggregate_id, version)
11);
12 
13CREATE INDEX idx_domain_events_aggregate ON domain_events (aggregate_id, version);
14 
15CREATE TABLE order_read_model (
16 order_id UUID PRIMARY KEY,
17 customer_id UUID NOT NULL,
18 status VARCHAR(50) NOT NULL,
19 total_cents BIGINT NOT NULL,
20 currency VARCHAR(3) NOT NULL,
21 item_count INT NOT NULL,
22 placed_at TIMESTAMPTZ,
23 confirmed_at TIMESTAMPTZ,
24 cancelled_at TIMESTAMPTZ,
25 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
26);
27 
28CREATE TABLE projection_checkpoints (
29 projection_name VARCHAR(100) PRIMARY KEY,
30 position BIGINT NOT NULL DEFAULT 0,
31 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
32);
33 

Domain Events

Define your domain events as plain classes that NestJS's event bus can publish.

typescript
1// src/order/events/order-placed.event.ts
2export class OrderPlacedEvent {
3 constructor(
4 public readonly orderId: string,
5 public readonly customerId: string,
6 public readonly lineItems: Array<{
7 productId: string;
8 quantity: number;
9 unitPriceCents: number;
10 }>,
11 public readonly totalCents: number,
12 public readonly currency: string,
13 public readonly metadata: {
14 correlationId: string;
15 userId: string;
16 timestamp: string;
17 },
18 ) {}
19}
20 
21// src/order/events/order-confirmed.event.ts
22export class OrderConfirmedEvent {
23 constructor(
24 public readonly orderId: string,
25 public readonly confirmedBy: string,
26 public readonly metadata: {
27 correlationId: string;
28 userId: string;
29 timestamp: string;
30 },
31 ) {}
32}
33 
34// src/order/events/order-cancelled.event.ts
35export class OrderCancelledEvent {
36 constructor(
37 public readonly orderId: string,
38 public readonly reason: string,
39 public readonly cancelledBy: string,
40 public readonly metadata: {
41 correlationId: string;
42 userId: string;
43 timestamp: string;
44 },
45 ) {}
46}
47 

Event Store Service

Build the event store as a NestJS service wrapping PostgreSQL operations.

typescript
1// src/order/infrastructure/event-store.service.ts
2import { Injectable } from '@nestjs/common';
3import { InjectDataSource } from '@nestjs/typeorm';
4import { DataSource } from 'typeorm';
5 
6interface StoredEvent {
7 id: number;
8 aggregateId: string;
9 aggregateType: string;
10 eventType: string;
11 version: number;
12 payload: Record<string, unknown>;
13 metadata: Record<string, unknown>;
14 createdAt: Date;
15}
16 
17@Injectable()
18export class EventStoreService {
19 constructor(@InjectDataSource() private dataSource: DataSource) {}
20 
21 async append(
22 aggregateId: string,
23 aggregateType: string,
24 expectedVersion: number,
25 events: Array<{ eventType: string; payload: unknown; metadata: unknown }>,
26 ): Promise<void> {
27 const queryRunner = this.dataSource.createQueryRunner();
28 await queryRunner.connect();
29 await queryRunner.startTransaction('SERIALIZABLE');
30 
31 try {
32 const result = await queryRunner.query(
33 `SELECT COALESCE(MAX(version), 0) as current_version
34 FROM domain_events WHERE aggregate_id = $1`,
35 [aggregateId],
36 );
37 
38 const currentVersion = parseInt(result[0].current_version, 10);
39 if (currentVersion !== expectedVersion) {
40 throw new Error(
41 `Concurrency conflict: expected ${expectedVersion}, found ${currentVersion}`,
42 );
43 }
44 
45 for (let i = 0; i < events.length; i++) {
46 await queryRunner.query(
47 `INSERT INTO domain_events (aggregate_id, aggregate_type, event_type, version, payload, metadata)
48 VALUES ($1, $2, $3, $4, $5, $6)`,
49 [
50 aggregateId,
51 aggregateType,
52 events[i].eventType,
53 expectedVersion + i + 1,
54 JSON.stringify(events[i].payload),
55 JSON.stringify(events[i].metadata),
56 ],
57 );
58 }
59 
60 await queryRunner.commitTransaction();
61 } catch (error) {
62 await queryRunner.rollbackTransaction();
63 throw error;
64 } finally {
65 await queryRunner.release();
66 }
67 }
68 
69 async loadEvents(aggregateId: string, afterVersion = 0): Promise<StoredEvent[]> {
70 return this.dataSource.query(
71 `SELECT id, aggregate_id as "aggregateId", aggregate_type as "aggregateType",
72 event_type as "eventType", version, payload, metadata, created_at as "createdAt"
73 FROM domain_events
74 WHERE aggregate_id = $1 AND version > $2
75 ORDER BY version ASC`,
76 [aggregateId, afterVersion],
77 );
78 }
79 
80 async readAll(afterPosition = 0, limit = 100): Promise<StoredEvent[]> {
81 return this.dataSource.query(
82 `SELECT id, aggregate_id as "aggregateId", aggregate_type as "aggregateType",
83 event_type as "eventType", version, payload, metadata, created_at as "createdAt"
84 FROM domain_events
85 WHERE id > $1
86 ORDER BY id ASC
87 LIMIT $2`,
88 [afterPosition, limit],
89 );
90 }
91}
92 

Order Aggregate

The aggregate enforces business rules and produces events.

typescript
1// src/order/domain/order.aggregate.ts
2import { OrderPlacedEvent } from '../events/order-placed.event';
3import { OrderConfirmedEvent } from '../events/order-confirmed.event';
4import { OrderCancelledEvent } from '../events/order-cancelled.event';
5 
6type OrderStatus = 'draft' | 'placed' | 'confirmed' | 'cancelled';
7 
8interface OrderState {
9 id: string;
10 status: OrderStatus;
11 customerId: string | null;
12 totalCents: number;
13 version: number;
14}
15 
16export class OrderAggregate {
17 private state: OrderState;
18 private uncommitted: Array<{ event: any; eventType: string }> = [];
19 
20 constructor(id: string) {
21 this.state = { id, status: 'draft', customerId: null, totalCents: 0, version: 0 };
22 }
23 
24 static fromHistory(id: string, events: Array<{ eventType: string; payload: any }>): OrderAggregate {
25 const aggregate = new OrderAggregate(id);
26 for (const { eventType, payload } of events) {
27 aggregate.apply(eventType, payload, false);
28 }
29 return aggregate;
30 }
31 
32 place(
33 customerId: string,
34 lineItems: Array<{ productId: string; quantity: number; unitPriceCents: number }>,
35 currency: string,
36 metadata: { correlationId: string; userId: string },
37 ): void {
38 if (this.state.status !== 'draft') {
39 throw new Error(`Cannot place order in status: ${this.state.status}`);
40 }
41 if (lineItems.length === 0) {
42 throw new Error('Order must have at least one line item');
43 }
44 
45 const totalCents = lineItems.reduce(
46 (sum, item) => sum + item.unitPriceCents * item.quantity, 0,
47 );
48 
49 const event = new OrderPlacedEvent(
50 this.state.id, customerId, lineItems, totalCents, currency,
51 { ...metadata, timestamp: new Date().toISOString() },
52 );
53 
54 this.apply('OrderPlaced', event, true);
55 }
56 
57 confirm(confirmedBy: string, metadata: { correlationId: string; userId: string }): void {
58 if (this.state.status !== 'placed') {
59 throw new Error(`Cannot confirm order in status: ${this.state.status}`);
60 }
61 
62 const event = new OrderConfirmedEvent(
63 this.state.id, confirmedBy,
64 { ...metadata, timestamp: new Date().toISOString() },
65 );
66 
67 this.apply('OrderConfirmed', event, true);
68 }
69 
70 cancel(reason: string, cancelledBy: string, metadata: { correlationId: string; userId: string }): void {
71 if (this.state.status === 'cancelled') {
72 throw new Error('Order already cancelled');
73 }
74 
75 const event = new OrderCancelledEvent(
76 this.state.id, reason, cancelledBy,
77 { ...metadata, timestamp: new Date().toISOString() },
78 );
79 
80 this.apply('OrderCancelled', event, true);
81 }
82 
83 private apply(eventType: string, payload: any, isNew: boolean): void {
84 switch (eventType) {
85 case 'OrderPlaced':
86 this.state = {
87 ...this.state,
88 status: 'placed',
89 customerId: payload.customerId,
90 totalCents: payload.totalCents ?? payload.totalAmountCents,
91 version: this.state.version + 1,
92 };
93 break;
94 case 'OrderConfirmed':
95 this.state = { ...this.state, status: 'confirmed', version: this.state.version + 1 };
96 break;
97 case 'OrderCancelled':
98 this.state = { ...this.state, status: 'cancelled', version: this.state.version + 1 };
99 break;
100 }
101 
102 if (isNew) {
103 this.uncommitted.push({ event: payload, eventType });
104 }
105 }
106 
107 getUncommitted() { return this.uncommitted; }
108 getVersion() { return this.state.version; }
109 getStatus() { return this.state.status; }
110}
111 

Commands and Handlers

NestJS CQRS module provides decorators for command and query handlers.

typescript
1// src/order/commands/place-order.command.ts
2export class PlaceOrderCommand {
3 constructor(
4 public readonly orderId: string,
5 public readonly customerId: string,
6 public readonly lineItems: Array<{
7 productId: string;
8 quantity: number;
9 unitPriceCents: number;
10 }>,
11 public readonly currency: string,
12 public readonly userId: string,
13 public readonly correlationId: string,
14 ) {}
15}
16 
17// src/order/commands/handlers/place-order.handler.ts
18import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
19import { PlaceOrderCommand } from '../place-order.command';
20import { EventStoreService } from '../../infrastructure/event-store.service';
21import { OrderAggregate } from '../../domain/order.aggregate';
22 
23@CommandHandler(PlaceOrderCommand)
24export class PlaceOrderHandler implements ICommandHandler<PlaceOrderCommand> {
25 constructor(
26 private eventStore: EventStoreService,
27 private eventBus: EventBus,
28 ) {}
29 
30 async execute(command: PlaceOrderCommand): Promise<void> {
31 const storedEvents = await this.eventStore.loadEvents(command.orderId);
32 const aggregate = OrderAggregate.fromHistory(
33 command.orderId,
34 storedEvents.map((e) => ({ eventType: e.eventType, payload: e.payload })),
35 );
36 
37 aggregate.place(
38 command.customerId,
39 command.lineItems,
40 command.currency,
41 { correlationId: command.correlationId, userId: command.userId },
42 );
43 
44 const uncommitted = aggregate.getUncommitted();
45 const expectedVersion = aggregate.getVersion() - uncommitted.length;
46 
47 await this.eventStore.append(
48 command.orderId,
49 'Order',
50 expectedVersion,
51 uncommitted.map((u) => ({
52 eventType: u.eventType,
53 payload: u.event,
54 metadata: u.event.metadata,
55 })),
56 );
57 
58 // Publish events to NestJS event bus for projections
59 for (const { event } of uncommitted) {
60 this.eventBus.publish(event);
61 }
62 }
63}
64 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Event Handlers (Projections)

Event handlers build read models asynchronously.

typescript
1// src/order/projections/order-read-model.handler.ts
2import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
3import { InjectDataSource } from '@nestjs/typeorm';
4import { DataSource } from 'typeorm';
5import { OrderPlacedEvent } from '../events/order-placed.event';
6 
7@EventsHandler(OrderPlacedEvent)
8export class OrderPlacedProjection implements IEventHandler<OrderPlacedEvent> {
9 constructor(@InjectDataSource() private dataSource: DataSource) {}
10 
11 async handle(event: OrderPlacedEvent): Promise<void> {
12 await this.dataSource.query(
13 `INSERT INTO order_read_model (order_id, customer_id, status, total_cents, currency, item_count, placed_at, updated_at)
14 VALUES ($1, $2, 'placed', $3, $4, $5, $6, NOW())
15 ON CONFLICT (order_id) DO NOTHING`,
16 [
17 event.orderId,
18 event.customerId,
19 event.totalCents,
20 event.currency,
21 event.lineItems.length,
22 event.metadata.timestamp,
23 ],
24 );
25 }
26}
27 
28@EventsHandler(OrderConfirmedEvent)
29export class OrderConfirmedProjection implements IEventHandler<OrderConfirmedEvent> {
30 constructor(@InjectDataSource() private dataSource: DataSource) {}
31 
32 async handle(event: OrderConfirmedEvent): Promise<void> {
33 await this.dataSource.query(
34 `UPDATE order_read_model SET status = 'confirmed', confirmed_at = $2, updated_at = NOW()
35 WHERE order_id = $1`,
36 [event.orderId, event.metadata.timestamp],
37 );
38 }
39}
40 
41@EventsHandler(OrderCancelledEvent)
42export class OrderCancelledProjection implements IEventHandler<OrderCancelledEvent> {
43 constructor(@InjectDataSource() private dataSource: DataSource) {}
44 
45 async handle(event: OrderCancelledEvent): Promise<void> {
46 await this.dataSource.query(
47 `UPDATE order_read_model SET status = 'cancelled', cancelled_at = $2, updated_at = NOW()
48 WHERE order_id = $1`,
49 [event.orderId, event.metadata.timestamp],
50 );
51 }
52}
53 

Query Side

typescript
1// src/order/queries/get-order.query.ts
2export class GetOrderQuery {
3 constructor(public readonly orderId: string) {}
4}
5 
6// src/order/queries/handlers/get-order.handler.ts
7import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
8import { InjectDataSource } from '@nestjs/typeorm';
9import { DataSource } from 'typeorm';
10import { GetOrderQuery } from '../get-order.query';
11 
12@QueryHandler(GetOrderQuery)
13export class GetOrderHandler implements IQueryHandler<GetOrderQuery> {
14 constructor(@InjectDataSource() private dataSource: DataSource) {}
15 
16 async execute(query: GetOrderQuery) {
17 const rows = await this.dataSource.query(
18 'SELECT * FROM order_read_model WHERE order_id = $1',
19 [query.orderId],
20 );
21 return rows[0] ?? null;
22 }
23}
24 

API Controller

Wire everything together with a REST controller.

typescript
1// src/order/order.controller.ts
2import { Body, Controller, Get, Param, Post, HttpCode } from '@nestjs/common';
3import { CommandBus, QueryBus } from '@nestjs/cqrs';
4import { v4 as uuidv4 } from 'uuid';
5import { PlaceOrderCommand } from './commands/place-order.command';
6import { GetOrderQuery } from './queries/get-order.query';
7 
8@Controller('orders')
9export class OrderController {
10 constructor(
11 private commandBus: CommandBus,
12 private queryBus: QueryBus,
13 ) {}
14 
15 @Post()
16 @HttpCode(201)
17 async placeOrder(@Body() body: PlaceOrderDto) {
18 const orderId = uuidv4();
19 await this.commandBus.execute(
20 new PlaceOrderCommand(
21 orderId,
22 body.customerId,
23 body.lineItems,
24 body.currency,
25 body.userId,
26 uuidv4(),
27 ),
28 );
29 return { orderId };
30 }
31 
32 @Get(':id')
33 async getOrder(@Param('id') id: string) {
34 return this.queryBus.execute(new GetOrderQuery(id));
35 }
36}
37 
38interface PlaceOrderDto {
39 customerId: string;
40 lineItems: Array<{
41 productId: string;
42 quantity: number;
43 unitPriceCents: number;
44 }>;
45 currency: string;
46 userId: string;
47}
48 

Module Wiring

typescript
1// src/order/order.module.ts
2import { Module } from '@nestjs/common';
3import { CqrsModule } from '@nestjs/cqrs';
4import { OrderController } from './order.controller';
5import { EventStoreService } from './infrastructure/event-store.service';
6import { PlaceOrderHandler } from './commands/handlers/place-order.handler';
7import { GetOrderHandler } from './queries/handlers/get-order.handler';
8import { OrderPlacedProjection, OrderConfirmedProjection, OrderCancelledProjection } from './projections/order-read-model.handler';
9 
10const CommandHandlers = [PlaceOrderHandler];
11const QueryHandlers = [GetOrderHandler];
12const EventHandlers = [OrderPlacedProjection, OrderConfirmedProjection, OrderCancelledProjection];
13 
14@Module({
15 imports: [CqrsModule],
16 controllers: [OrderController],
17 providers: [
18 EventStoreService,
19 ...CommandHandlers,
20 ...QueryHandlers,
21 ...EventHandlers,
22 ],
23})
24export class OrderModule {}
25 

Testing

typescript
1import { Test } from '@nestjs/testing';
2import { CommandBus, CqrsModule } from '@nestjs/cqrs';
3import { PlaceOrderCommand } from '../commands/place-order.command';
4import { PlaceOrderHandler } from '../commands/handlers/place-order.handler';
5 
6describe('PlaceOrderHandler', () => {
7 let commandBus: CommandBus;
8 
9 beforeEach(async () => {
10 const module = await Test.createTestingModule({
11 imports: [CqrsModule],
12 providers: [
13 PlaceOrderHandler,
14 {
15 provide: EventStoreService,
16 useValue: {
17 loadEvents: jest.fn().mockResolvedValue([]),
18 append: jest.fn().mockResolvedValue(undefined),
19 },
20 },
21 ],
22 }).compile();
23 
24 commandBus = module.get(CommandBus);
25 commandBus.register([PlaceOrderHandler]);
26 });
27 
28 it('should place an order successfully', async () => {
29 await expect(
30 commandBus.execute(
31 new PlaceOrderCommand(
32 'order-1', 'cust-1',
33 [{ productId: 'p1', quantity: 2, unitPriceCents: 1500 }],
34 'USD', 'user-1', 'corr-1',
35 ),
36 ),
37 ).resolves.not.toThrow();
38 });
39});
40 

Conclusion

NestJS's @nestjs/cqrs module provides a clean, decorator-driven approach to CQRS that reduces boilerplate without hiding the underlying patterns. The command bus, query bus, and event bus are injected as standard NestJS providers, making testing straightforward. Combined with TypeORM for persistence and PostgreSQL for both the event store and read models, you get a production-capable system with a small dependency footprint.

The key advantage of NestJS for CQRS is the modular architecture — each bounded context becomes a NestJS module with its own commands, queries, events, and projections, cleanly separated and independently testable.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026