Back to Journal
System Design

Complete Guide to CQRS & Event Sourcing with Java

A comprehensive guide to implementing CQRS & Event Sourcing using Java, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 17 min read

Java's mature ecosystem, strong typing, and battle-tested frameworks make it a natural choice for CQRS and Event Sourcing implementations. The Spring ecosystem provides foundational components, and libraries like Axon Framework offer production-ready CQRS infrastructure. This guide walks through building a complete CQRS/ES system in Java, covering event store design, aggregate implementation, and projection management with production-ready patterns.

Architecture Overview

The Java implementation follows the standard CQRS/ES decomposition: commands flow through a command bus to aggregates that emit events, which are persisted in an event store and consumed by projection handlers to build read models.

1Command ──▶ CommandBus ──▶ Aggregate ──▶ EventStore
2
3 EventBus ◀──────┘
4
5 ┌───────────┼───────────┐
6 ▼ ▼ ▼
7 Projection Projection Projection
8 (JPA/SQL) (Elastic) (Redis)
9 

Defining Domain Events

Use sealed interfaces (Java 17+) to create a closed hierarchy of domain events.

java
1public sealed interface OrderEvent {
2 String orderId();
3 Instant occurredAt();
4}
5 
6public record OrderPlaced(
7 String orderId,
8 String customerId,
9 List<LineItem> lineItems,
10 long totalAmountCents,
11 String currency,
12 ShippingAddress shippingAddress,
13 Instant occurredAt,
14 String correlationId,
15 String causationId
16) implements OrderEvent {}
17 
18public record OrderConfirmed(
19 String orderId,
20 String confirmedBy,
21 Instant occurredAt,
22 String correlationId
23) implements OrderEvent {}
24 
25public record OrderCancelled(
26 String orderId,
27 String reason,
28 String cancelledBy,
29 Instant occurredAt,
30 String correlationId
31) implements OrderEvent {}
32 
33public record OrderShipped(
34 String orderId,
35 String trackingNumber,
36 String carrier,
37 Instant occurredAt,
38 String correlationId
39) implements OrderEvent {}
40 
41public record LineItem(
42 String productId,
43 int quantity,
44 long unitPriceCents
45) {}
46 
47public record ShippingAddress(
48 String street,
49 String city,
50 String state,
51 String country,
52 String postalCode
53) {}
54 

Records provide immutability, value-based equality, and compact syntax — ideal for event definitions.

Event Store with JPA and PostgreSQL

The event store persists events with optimistic concurrency control using a version column and unique constraint.

java
1@Entity
2@Table(name = "events", uniqueConstraints = {
3 @UniqueConstraint(columnNames = {"aggregate_id", "version"})
4})
5public class EventEntry {
6 @Id
7 @GeneratedValue(strategy = GenerationType.IDENTITY)
8 private Long globalPosition;
9 
10 @Column(name = "aggregate_id", nullable = false)
11 private String aggregateId;
12 
13 @Column(name = "aggregate_type", nullable = false)
14 private String aggregateType;
15 
16 @Column(name = "event_type", nullable = false)
17 private String eventType;
18 
19 @Column(nullable = false)
20 private int version;
21 
22 @Column(columnDefinition = "jsonb", nullable = false)
23 private String payload;
24 
25 @Column(columnDefinition = "jsonb")
26 private String metadata;
27 
28 @Column(name = "created_at", nullable = false)
29 private Instant createdAt;
30 
31 // getters, setters omitted
32}
33 
34@Repository
35public interface EventEntryRepository extends JpaRepository<EventEntry, Long> {
36 
37 List<EventEntry> findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(
38 String aggregateId, int afterVersion
39 );
40 
41 Optional<EventEntry> findTopByAggregateIdOrderByVersionDesc(String aggregateId);
42 
43 List<EventEntry> findByGlobalPositionGreaterThanOrderByGlobalPositionAsc(
44 long afterPosition, Pageable pageable
45 );
46}
47 
48@Service
49public class JpaEventStore implements EventStore {
50 
51 private final EventEntryRepository repository;
52 private final ObjectMapper objectMapper;
53 private final ApplicationEventPublisher eventPublisher;
54 
55 public JpaEventStore(
56 EventEntryRepository repository,
57 ObjectMapper objectMapper,
58 ApplicationEventPublisher eventPublisher
59 ) {
60 this.repository = repository;
61 this.objectMapper = objectMapper;
62 this.eventPublisher = eventPublisher;
63 }
64 
65 @Transactional
66 public void append(String aggregateId, String aggregateType,
67 int expectedVersion, List<OrderEvent> events) {
68 var current = repository.findTopByAggregateIdOrderByVersionDesc(aggregateId);
69 int currentVersion = current.map(EventEntry::getVersion).orElse(0);
70 
71 if (currentVersion != expectedVersion) {
72 throw new OptimisticConcurrencyException(
73 "Expected version %d but found %d".formatted(expectedVersion, currentVersion)
74 );
75 }
76 
77 for (int i = 0; i < events.size(); i++) {
78 var event = events.get(i);
79 var entry = new EventEntry();
80 entry.setAggregateId(aggregateId);
81 entry.setAggregateType(aggregateType);
82 entry.setEventType(event.getClass().getSimpleName());
83 entry.setVersion(expectedVersion + i + 1);
84 entry.setPayload(serialize(event));
85 entry.setCreatedAt(Instant.now());
86 repository.save(entry);
87 }
88 
89 // Publish for projections
90 events.forEach(eventPublisher::publishEvent);
91 }
92 
93 public List<OrderEvent> load(String aggregateId, int afterVersion) {
94 return repository
95 .findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(aggregateId, afterVersion)
96 .stream()
97 .map(this::deserialize)
98 .toList();
99 }
100 
101 private String serialize(OrderEvent event) {
102 try {
103 return objectMapper.writeValueAsString(event);
104 } catch (JsonProcessingException e) {
105 throw new EventSerializationException(e);
106 }
107 }
108 
109 private OrderEvent deserialize(EventEntry entry) {
110 try {
111 Class<? extends OrderEvent> type = switch (entry.getEventType()) {
112 case "OrderPlaced" -> OrderPlaced.class;
113 case "OrderConfirmed" -> OrderConfirmed.class;
114 case "OrderCancelled" -> OrderCancelled.class;
115 case "OrderShipped" -> OrderShipped.class;
116 default -> throw new UnknownEventTypeException(entry.getEventType());
117 };
118 return objectMapper.readValue(entry.getPayload(), type);
119 } catch (JsonProcessingException e) {
120 throw new EventDeserializationException(e);
121 }
122 }
123}
124 

Aggregate Implementation

The aggregate guards invariants and produces events. Java's pattern matching (Java 21+) makes event application clean.

java
1public class OrderAggregate {
2 
3 private String id;
4 private OrderStatus status = OrderStatus.DRAFT;
5 private String customerId;
6 private List<LineItem> lineItems = new ArrayList<>();
7 private long totalAmountCents;
8 private String currency;
9 private int version;
10 private final List<OrderEvent> uncommittedEvents = new ArrayList<>();
11 
12 public OrderAggregate(String id) {
13 this.id = id;
14 }
15 
16 public static OrderAggregate rehydrate(String id, List<OrderEvent> history) {
17 var aggregate = new OrderAggregate(id);
18 history.forEach(aggregate::apply);
19 return aggregate;
20 }
21 
22 // Command handlers
23 public void place(String customerId, List<LineItem> items, String currency) {
24 if (status != OrderStatus.DRAFT) {
25 throw new IllegalStateException("Cannot place order in status: " + status);
26 }
27 if (items.isEmpty()) {
28 throw new IllegalArgumentException("Order must have at least one item");
29 }
30 
31 long total = items.stream()
32 .mapToLong(i -> i.unitPriceCents() * i.quantity())
33 .sum();
34 
35 emit(new OrderPlaced(
36 id, customerId, items, total, currency,
37 null, Instant.now(), UUID.randomUUID().toString(), null
38 ));
39 }
40 
41 public void confirm(String confirmedBy) {
42 if (status != OrderStatus.PLACED) {
43 throw new IllegalStateException("Cannot confirm order in status: " + status);
44 }
45 emit(new OrderConfirmed(
46 id, confirmedBy, Instant.now(), UUID.randomUUID().toString()
47 ));
48 }
49 
50 public void cancel(String reason, String cancelledBy) {
51 if (status == OrderStatus.CANCELLED) {
52 throw new IllegalStateException("Order already cancelled");
53 }
54 if (status == OrderStatus.SHIPPED) {
55 throw new IllegalStateException("Cannot cancel shipped order");
56 }
57 emit(new OrderCancelled(
58 id, reason, cancelledBy, Instant.now(), UUID.randomUUID().toString()
59 ));
60 }
61 
62 // Event application
63 private void apply(OrderEvent event) {
64 switch (event) {
65 case OrderPlaced e -> {
66 status = OrderStatus.PLACED;
67 customerId = e.customerId();
68 lineItems = new ArrayList<>(e.lineItems());
69 totalAmountCents = e.totalAmountCents();
70 currency = e.currency();
71 }
72 case OrderConfirmed e -> status = OrderStatus.CONFIRMED;
73 case OrderCancelled e -> status = OrderStatus.CANCELLED;
74 case OrderShipped e -> status = OrderStatus.SHIPPED;
75 }
76 version++;
77 }
78 
79 private void emit(OrderEvent event) {
80 apply(event);
81 uncommittedEvents.add(event);
82 }
83 
84 public List<OrderEvent> uncommittedEvents() {
85 return Collections.unmodifiableList(uncommittedEvents);
86 }
87 
88 public int version() { return version; }
89 public int uncommittedCount() { return uncommittedEvents.size(); }
90}
91 
92enum OrderStatus {
93 DRAFT, PLACED, CONFIRMED, CANCELLED, SHIPPED
94}
95 

Command Handler Service

The command handler loads the aggregate, executes the command, and persists events atomically.

java
1@Service
2public class OrderCommandHandler {
3 
4 private final EventStore eventStore;
5 
6 public OrderCommandHandler(EventStore eventStore) {
7 this.eventStore = eventStore;
8 }
9 
10 @Transactional
11 public void handle(PlaceOrderCommand cmd) {
12 var events = eventStore.load(cmd.orderId(), 0);
13 var aggregate = OrderAggregate.rehydrate(cmd.orderId(), events);
14 
15 aggregate.place(cmd.customerId(), cmd.lineItems(), cmd.currency());
16 
17 eventStore.append(
18 cmd.orderId(),
19 "Order",
20 aggregate.version() - aggregate.uncommittedCount(),
21 aggregate.uncommittedEvents()
22 );
23 }
24 
25 @Transactional
26 public void handle(ConfirmOrderCommand cmd) {
27 var events = eventStore.load(cmd.orderId(), 0);
28 var aggregate = OrderAggregate.rehydrate(cmd.orderId(), events);
29 
30 aggregate.confirm(cmd.confirmedBy());
31 
32 eventStore.append(
33 cmd.orderId(),
34 "Order",
35 aggregate.version() - aggregate.uncommittedCount(),
36 aggregate.uncommittedEvents()
37 );
38 }
39 
40 @Transactional
41 public void handle(CancelOrderCommand cmd) {
42 var events = eventStore.load(cmd.orderId(), 0);
43 var aggregate = OrderAggregate.rehydrate(cmd.orderId(), events);
44 
45 aggregate.cancel(cmd.reason(), cmd.cancelledBy());
46 
47 eventStore.append(
48 cmd.orderId(),
49 "Order",
50 aggregate.version() - aggregate.uncommittedCount(),
51 aggregate.uncommittedEvents()
52 );
53 }
54}
55 
56public record PlaceOrderCommand(
57 String orderId, String customerId,
58 List<LineItem> lineItems, String currency
59) {}
60 
61public record ConfirmOrderCommand(String orderId, String confirmedBy) {}
62public record CancelOrderCommand(String orderId, String reason, String cancelledBy) {}
63 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Projection with Spring Event Listener

Projections subscribe to domain events and maintain denormalized read models.

java
1@Component
2public class OrderSummaryProjection {
3 
4 private final JdbcTemplate jdbc;
5 
6 public OrderSummaryProjection(JdbcTemplate jdbc) {
7 this.jdbc = jdbc;
8 }
9 
10 @EventListener
11 public void on(OrderPlaced event) {
12 jdbc.update("""
13 INSERT INTO order_summary (order_id, customer_id, status, total_cents, currency, placed_at)
14 VALUES (?, ?, 'PLACED', ?, ?, ?)
15 ON CONFLICT (order_id) DO NOTHING
16 """,
17 event.orderId(), event.customerId(),
18 event.totalAmountCents(), event.currency(), event.occurredAt()
19 );
20 }
21 
22 @EventListener
23 public void on(OrderConfirmed event) {
24 jdbc.update(
25 "UPDATE order_summary SET status = 'CONFIRMED', updated_at = ? WHERE order_id = ?",
26 event.occurredAt(), event.orderId()
27 );
28 }
29 
30 @EventListener
31 public void on(OrderCancelled event) {
32 jdbc.update(
33 "UPDATE order_summary SET status = 'CANCELLED', updated_at = ? WHERE order_id = ?",
34 event.occurredAt(), event.orderId()
35 );
36 }
37}
38 

Query Service

java
1@Service
2public class OrderQueryService {
3 
4 private final JdbcTemplate jdbc;
5 
6 public OrderQueryService(JdbcTemplate jdbc) {
7 this.jdbc = jdbc;
8 }
9 
10 public Optional<OrderSummaryView> findById(String orderId) {
11 return jdbc.query(
12 "SELECT * FROM order_summary WHERE order_id = ?",
13 this::mapRow, orderId
14 ).stream().findFirst();
15 }
16 
17 public List<OrderSummaryView> findByCustomer(String customerId, int limit, int offset) {
18 return jdbc.query(
19 "SELECT * FROM order_summary WHERE customer_id = ? ORDER BY placed_at DESC LIMIT ? OFFSET ?",
20 this::mapRow, customerId, limit, offset
21 );
22 }
23 
24 private OrderSummaryView mapRow(ResultSet rs, int rowNum) throws SQLException {
25 return new OrderSummaryView(
26 rs.getString("order_id"),
27 rs.getString("customer_id"),
28 rs.getString("status"),
29 rs.getLong("total_cents"),
30 rs.getString("currency"),
31 rs.getTimestamp("placed_at").toInstant()
32 );
33 }
34}
35 
36public record OrderSummaryView(
37 String orderId, String customerId, String status,
38 long totalCents, String currency, Instant placedAt
39) {}
40 

Event Upcasting

Handle event schema evolution by transforming old event formats to the current version during deserialization.

java
1@Component
2public class EventUpcasterChain {
3 
4 private final Map<String, List<EventUpcaster>> upcasters = new HashMap<>();
5 
6 public void register(String eventType, int fromVersion, EventUpcaster upcaster) {
7 upcasters.computeIfAbsent(eventType + ":v" + fromVersion, k -> new ArrayList<>())
8 .add(upcaster);
9 }
10 
11 public JsonNode upcast(String eventType, int version, JsonNode payload) {
12 JsonNode current = payload;
13 int currentVersion = version;
14 
15 while (upcasters.containsKey(eventType + ":v" + currentVersion)) {
16 for (var upcaster : upcasters.get(eventType + ":v" + currentVersion)) {
17 current = upcaster.upcast(current);
18 }
19 currentVersion++;
20 }
21 
22 return current;
23 }
24}
25 
26@FunctionalInterface
27public interface EventUpcaster {
28 JsonNode upcast(JsonNode payload);
29}
30 
31// Registration example
32@Configuration
33public class UpcasterConfig {
34 @Bean
35 EventUpcasterChain upcasterChain() {
36 var chain = new EventUpcasterChain();
37 
38 // OrderPlaced v1 -> v2: added currency field with default
39 chain.register("OrderPlaced", 1, payload -> {
40 ((ObjectNode) payload).put("currency", "USD");
41 return payload;
42 });
43 
44 return chain;
45 }
46}
47 

Snapshot Support

java
1@Entity
2@Table(name = "aggregate_snapshots")
3public class AggregateSnapshot {
4 @Id
5 private String aggregateId;
6 private int version;
7 
8 @Column(columnDefinition = "jsonb")
9 private String state;
10 private Instant createdAt;
11}
12 
13@Service
14public class SnapshotAwareRepository {
15 
16 private final SnapshotRepository snapshots;
17 private final EventStore eventStore;
18 private final ObjectMapper mapper;
19 private static final int SNAPSHOT_THRESHOLD = 50;
20 
21 public OrderAggregate load(String aggregateId) {
22 var snapshot = snapshots.findById(aggregateId).orElse(null);
23 OrderAggregate aggregate;
24 int afterVersion;
25 
26 if (snapshot != null) {
27 aggregate = deserializeSnapshot(snapshot);
28 afterVersion = snapshot.getVersion();
29 } else {
30 aggregate = new OrderAggregate(aggregateId);
31 afterVersion = 0;
32 }
33 
34 var events = eventStore.load(aggregateId, afterVersion);
35 events.forEach(aggregate::apply);
36 
37 return aggregate;
38 }
39 
40 public void save(OrderAggregate aggregate) {
41 eventStore.append(
42 aggregate.id(), "Order",
43 aggregate.version() - aggregate.uncommittedCount(),
44 aggregate.uncommittedEvents()
45 );
46 
47 if (aggregate.version() % SNAPSHOT_THRESHOLD == 0) {
48 var snapshot = new AggregateSnapshot();
49 snapshot.setAggregateId(aggregate.id());
50 snapshot.setVersion(aggregate.version());
51 snapshot.setState(serializeState(aggregate));
52 snapshot.setCreatedAt(Instant.now());
53 snapshots.save(snapshot);
54 }
55 }
56}
57 

Testing Aggregates

Use a given-when-then style for aggregate tests.

java
1@Test
2void shouldPlaceOrder() {
3 // Given
4 var aggregate = new OrderAggregate("order-1");
5 
6 // When
7 aggregate.place("customer-1", List.of(
8 new LineItem("product-1", 2, 1999)
9 ), "USD");
10 
11 // Then
12 assertThat(aggregate.uncommittedEvents()).hasSize(1);
13 assertThat(aggregate.uncommittedEvents().getFirst()).isInstanceOf(OrderPlaced.class);
14 
15 var event = (OrderPlaced) aggregate.uncommittedEvents().getFirst();
16 assertThat(event.customerId()).isEqualTo("customer-1");
17 assertThat(event.totalAmountCents()).isEqualTo(3998);
18}
19 
20@Test
21void shouldNotConfirmDraftOrder() {
22 var aggregate = new OrderAggregate("order-1");
23 
24 assertThatThrownBy(() -> aggregate.confirm("admin"))
25 .isInstanceOf(IllegalStateException.class)
26 .hasMessageContaining("Cannot confirm order in status: DRAFT");
27}
28 
29@Test
30void shouldRehydrateAndCancel() {
31 var history = List.<OrderEvent>of(
32 new OrderPlaced("order-1", "cust-1", List.of(new LineItem("p1", 1, 1000)),
33 1000, "USD", null, Instant.now(), "corr-1", null)
34 );
35 
36 var aggregate = OrderAggregate.rehydrate("order-1", history);
37 aggregate.cancel("Customer request", "agent-1");
38 
39 assertThat(aggregate.uncommittedEvents()).hasSize(1);
40 assertThat(aggregate.uncommittedEvents().getFirst()).isInstanceOf(OrderCancelled.class);
41}
42 

Conclusion

Java's type system, Spring ecosystem, and mature tooling make it well-suited for CQRS and Event Sourcing. Sealed interfaces provide a closed event hierarchy, records give you immutable event definitions with minimal boilerplate, and pattern matching simplifies event application in aggregates. For teams already invested in the JVM ecosystem, this approach integrates smoothly with existing infrastructure while providing the full benefits of event-sourced architectures.

Consider Axon Framework if you want a batteries-included CQRS/ES solution with saga support, distributed command/query bus, and event store management. Build from scratch using the patterns in this guide when you want full control and minimal dependencies.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026