Back to Journal
System Design

Complete Guide to Event-Driven Architecture with Rust

A comprehensive guide to implementing Event-Driven Architecture using Rust, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 19 min read

Rust brings unique advantages to event-driven architecture: zero-cost abstractions, compile-time concurrency safety, and memory efficiency that translates directly to lower infrastructure costs. The trade-off is a steeper learning curve and longer development cycles. This guide covers production-grade patterns for building event-driven systems in Rust, from basic Kafka consumers to sophisticated stream processing with exactly-once guarantees.

Core Architecture

Rust event-driven systems typically use rdkafka (Rust bindings for librdkafka) for Kafka interaction and tokio for the async runtime. The type system enforces correctness at compile time — missing error handling, data races, and use-after-free bugs are caught before deployment.

rust
1use serde::{Deserialize, Serialize};
2use chrono::{DateTime, Utc};
3use rust_decimal::Decimal;
4use uuid::Uuid;
5 
6#[derive(Debug, Clone, Serialize, Deserialize)]
7#[serde(tag = "event_type")]
8pub enum OrderEvent {
9 OrderCreated(OrderCreated),
10 OrderShipped(OrderShipped),
11 OrderCancelled(OrderCancelled),
12}
13 
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct OrderCreated {
16 pub event_id: Uuid,
17 pub order_id: String,
18 pub customer_id: String,
19 pub items: Vec<OrderItem>,
20 pub total: Decimal,
21 pub currency: String,
22 pub timestamp: DateTime<Utc>,
23}
24 
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct OrderItem {
27 pub sku: String,
28 pub quantity: u32,
29 pub unit_price: Decimal,
30}
31 

The #[serde(tag = "event_type")] annotation produces JSON with an embedded type discriminator, making deserialization automatic and exhaustive pattern matching enforced by the compiler.

Kafka Producer

A production Kafka producer with proper configuration for durability and throughput:

rust
1use rdkafka::config::ClientConfig;
2use rdkafka::producer::{FutureProducer, FutureRecord};
3use rdkafka::message::OwnedHeaders;
4use std::time::Duration;
5 
6pub struct EventPublisher {
7 producer: FutureProducer,
8}
9 
10impl EventPublisher {
11 pub fn new(brokers: &str) -> Result<Self, rdkafka::error::KafkaError> {
12 let producer: FutureProducer = ClientConfig::new()
13 .set("bootstrap.servers", brokers)
14 .set("acks", "all")
15 .set("enable.idempotence", "true")
16 .set("max.in.flight.requests.per.connection", "5")
17 .set("retries", "3")
18 .set("linger.ms", "10")
19 .set("batch.size", "65536")
20 .set("compression.type", "lz4")
21 .create()?;
22 
23 Ok(Self { producer })
24 }
25 
26 pub async fn publish<T: Serialize>(
27 &self,
28 topic: &str,
29 key: &str,
30 event: &T,
31 ) -> Result<(), Box<dyn std::error::Error>> {
32 let payload = serde_json::to_vec(event)?;
33 let event_type = std::any::type_name::<T>()
34 .rsplit("::")
35 .next()
36 .unwrap_or("Unknown");
37 
38 let headers = OwnedHeaders::new()
39 .insert(rdkafka::message::Header {
40 key: "event_type",
41 value: Some(event_type.as_bytes()),
42 })
43 .insert(rdkafka::message::Header {
44 key: "produced_at",
45 value: Some(Utc::now().to_rfc3339().as_bytes()),
46 });
47 
48 self.producer
49 .send(
50 FutureRecord::to(topic)
51 .key(key)
52 .payload(&payload)
53 .headers(headers),
54 Duration::from_secs(5),
55 )
56 .await
57 .map_err(|(err, _)| err)?;
58 
59 Ok(())
60 }
61}
62 

Stream Consumer with Graceful Shutdown

The consumer combines rdkafka's StreamConsumer with tokio for async processing:

rust
1use rdkafka::consumer::{Consumer, StreamConsumer, CommitMode};
2use rdkafka::config::ClientConfig;
3use rdkafka::message::{Message, Headers};
4use futures::StreamExt;
5use tokio::sync::watch;
6use tracing::{info, error, warn, instrument};
7 
8pub struct EventConsumer {
9 consumer: StreamConsumer,
10 handlers: HashMap<String, Box<dyn EventHandler>>,
11 dlq_producer: EventPublisher,
12}
13 
14#[async_trait::async_trait]
15pub trait EventHandler: Send + Sync {
16 async fn handle(&self, payload: &[u8]) -> Result<(), Box<dyn std::error::Error>>;
17}
18 
19impl EventConsumer {
20 pub fn new(
21 brokers: &str,
22 group_id: &str,
23 topics: &[&str],
24 handlers: HashMap<String, Box<dyn EventHandler>>,
25 ) -> Result<Self, Box<dyn std::error::Error>> {
26 let consumer: StreamConsumer = ClientConfig::new()
27 .set("bootstrap.servers", brokers)
28 .set("group.id", group_id)
29 .set("auto.offset.reset", "earliest")
30 .set("enable.auto.commit", "false")
31 .set("session.timeout.ms", "30000")
32 .set("max.poll.interval.ms", "300000")
33 .create()?;
34 
35 consumer.subscribe(topics)?;
36 
37 let dlq_producer = EventPublisher::new(brokers)?;
38 
39 Ok(Self {
40 consumer,
41 handlers,
42 dlq_producer,
43 })
44 }
45 
46 #[instrument(skip(self, shutdown_rx))]
47 pub async fn run(&self, mut shutdown_rx: watch::Receiver<bool>) {
48 let mut stream = self.consumer.stream();
49 
50 loop {
51 tokio::select! {
52 Some(result) = stream.next() => {
53 match result {
54 Ok(msg) => {
55 self.process_message(&msg).await;
56 if let Err(e) = self.consumer.commit_message(&msg, CommitMode::Sync) {
57 error!("Commit failed: {}", e);
58 }
59 }
60 Err(e) => error!("Kafka stream error: {}", e),
61 }
62 }
63 _ = shutdown_rx.changed() => {
64 if *shutdown_rx.borrow() {
65 info!("Shutdown signal received, draining consumer");
66 break;
67 }
68 }
69 }
70 }
71 }
72 
73 async fn process_message(&self, msg: &rdkafka::message::BorrowedMessage<'_>) {
74 let event_type = msg
75 .headers()
76 .and_then(|headers| {
77 (0..headers.count()).find_map(|i| {
78 let header = headers.get(i);
79 if header.key == "event_type" {
80 header.value.map(|v| String::from_utf8_lossy(v).to_string())
81 } else {
82 None
83 }
84 })
85 })
86 .unwrap_or_default();
87 
88 let payload = match msg.payload() {
89 Some(p) => p,
90 None => {
91 warn!("Empty payload at offset {}", msg.offset());
92 return;
93 }
94 };
95 
96 match self.handlers.get(&event_type) {
97 Some(handler) => {
98 if let Err(e) = handler.handle(payload).await {
99 error!(
100 event_type = %event_type,
101 offset = msg.offset(),
102 partition = msg.partition(),
103 "Handler failed: {}",
104 e
105 );
106 self.send_to_dlq(msg, &e.to_string()).await;
107 }
108 }
109 None => warn!("No handler for event type: {}", event_type),
110 }
111 }
112 
113 async fn send_to_dlq(
114 &self,
115 msg: &rdkafka::message::BorrowedMessage<'_>,
116 error: &str,
117 ) {
118 let dlq_event = serde_json::json!({
119 "original_topic": msg.topic(),
120 "original_partition": msg.partition(),
121 "original_offset": msg.offset(),
122 "error": error,
123 "payload": msg.payload().map(|p| String::from_utf8_lossy(p).to_string()),
124 });
125 
126 let dlq_topic = format!("{}.dlq", msg.topic());
127 if let Err(e) = self.dlq_producer
128 .publish(&dlq_topic, &msg.offset().to_string(), &dlq_event)
129 .await
130 {
131 error!("DLQ publish failed: {}", e);
132 }
133 }
134}
135 

Typed Event Handlers

Implement handlers with automatic deserialization and strong typing:

rust
1pub struct OrderCreatedHandler {
2 order_repo: Arc<dyn OrderRepository>,
3 notification_service: Arc<dyn NotificationService>,
4}
5 
6#[async_trait::async_trait]
7impl EventHandler for OrderCreatedHandler {
8 async fn handle(&self, payload: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
9 let event: OrderCreated = serde_json::from_slice(payload)?;
10 
11 // Idempotency check
12 if self.order_repo.exists(&event.order_id).await? {
13 tracing::info!(order_id = %event.order_id, "Order already processed, skipping");
14 return Ok(());
15 }
16 
17 self.order_repo
18 .save(Order {
19 id: event.order_id.clone(),
20 customer_id: event.customer_id.clone(),
21 items: event.items,
22 total: event.total,
23 status: OrderStatus::Created,
24 created_at: event.timestamp,
25 })
26 .await?;
27 
28 self.notification_service
29 .send_order_confirmation(&event.customer_id, &event.order_id)
30 .await?;
31 
32 Ok(())
33 }
34}
35 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Transactional Outbox with SQLx

The outbox pattern using SQLx for compile-time verified SQL:

rust
1use sqlx::{PgPool, Postgres, Transaction};
2 
3pub struct OutboxEvent {
4 pub id: Uuid,
5 pub aggregate_id: String,
6 pub aggregate_type: String,
7 pub event_type: String,
8 pub payload: serde_json::Value,
9 pub created_at: DateTime<Utc>,
10 pub published_at: Option<DateTime<Utc>>,
11}
12 
13pub struct OrderService {
14 pool: PgPool,
15}
16 
17impl OrderService {
18 pub async fn create_order(
19 &self,
20 cmd: CreateOrderCommand,
21 ) -> Result<Order, Box<dyn std::error::Error>> {
22 let mut tx: Transaction<'_, Postgres> = self.pool.begin().await?;
23 
24 let order_id = Uuid::new_v4().to_string();
25 
26 sqlx::query!(
27 r#"INSERT INTO orders (id, customer_id, items, status, created_at)
28 VALUES ($1, $2, $3, $4, NOW())"#,
29 &order_id,
30 &cmd.customer_id,
31 serde_json::to_value(&cmd.items)?,
32 "created",
33 )
34 .execute(&mut *tx)
35 .await?;
36 
37 let event = OrderCreated {
38 event_id: Uuid::new_v4(),
39 order_id: order_id.clone(),
40 customer_id: cmd.customer_id.clone(),
41 items: cmd.items.clone(),
42 total: cmd.total,
43 currency: "USD".to_string(),
44 timestamp: Utc::now(),
45 };
46 
47 sqlx::query!(
48 r#"INSERT INTO outbox_events (id, aggregate_id, aggregate_type, event_type, payload)
49 VALUES ($1, $2, $3, $4, $5)"#,
50 event.event_id,
51 &order_id,
52 "Order",
53 "OrderCreated",
54 serde_json::to_value(&event)?,
55 )
56 .execute(&mut *tx)
57 .await?;
58 
59 tx.commit().await?;
60 
61 Ok(Order {
62 id: order_id,
63 customer_id: cmd.customer_id,
64 items: cmd.items,
65 total: cmd.total,
66 status: OrderStatus::Created,
67 created_at: Utc::now(),
68 })
69 }
70}
71 

Outbox Poller with Backpressure

rust
1pub struct OutboxPoller {
2 pool: PgPool,
3 publisher: EventPublisher,
4}
5 
6impl OutboxPoller {
7 pub async fn run(&self, mut shutdown: watch::Receiver<bool>) {
8 let mut interval = tokio::time::interval(Duration::from_millis(100));
9 
10 loop {
11 tokio::select! {
12 _ = interval.tick() => {
13 if let Err(e) = self.poll_batch().await {
14 error!("Outbox poll failed: {}", e);
15 }
16 }
17 _ = shutdown.changed() => {
18 if *shutdown.borrow() {
19 info!("Outbox poller shutting down");
20 break;
21 }
22 }
23 }
24 }
25 }
26 
27 async fn poll_batch(&self) -> Result<(), Box<dyn std::error::Error>> {
28 let entries = sqlx::query_as!(
29 OutboxEvent,
30 r#"SELECT id, aggregate_id, aggregate_type, event_type,
31 payload, created_at, published_at
32 FROM outbox_events
33 WHERE published_at IS NULL
34 ORDER BY created_at ASC
35 LIMIT 100
36 FOR UPDATE SKIP LOCKED"#,
37 )
38 .fetch_all(&self.pool)
39 .await?;
40 
41 for entry in entries {
42 let topic = format!("{}-events", entry.aggregate_type.to_lowercase());
43 self.publisher
44 .publish(&topic, &entry.aggregate_id, &entry.payload)
45 .await?;
46 
47 sqlx::query!(
48 "UPDATE outbox_events SET published_at = NOW() WHERE id = $1",
49 entry.id,
50 )
51 .execute(&self.pool)
52 .await?;
53 }
54 
55 Ok(())
56 }
57}
58 

Concurrent Processing with Bounded Channels

Use tokio channels for backpressure-aware concurrent processing:

rust
1use tokio::sync::mpsc;
2 
3pub async fn run_with_concurrency(
4 consumer: &StreamConsumer,
5 handler: Arc<dyn EventHandler>,
6 max_concurrency: usize,
7 mut shutdown: watch::Receiver<bool>,
8) {
9 let (tx, mut rx) = mpsc::channel::<rdkafka::message::OwnedMessage>(max_concurrency * 2);
10 let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrency));
11 
12 // Spawn processor task
13 let handler_clone = handler.clone();
14 let sem_clone = semaphore.clone();
15 tokio::spawn(async move {
16 while let Some(msg) = rx.recv().await {
17 let permit = sem_clone.clone().acquire_owned().await.unwrap();
18 let h = handler_clone.clone();
19 tokio::spawn(async move {
20 if let Some(payload) = msg.payload() {
21 if let Err(e) = h.handle(payload).await {
22 error!("Handler error: {}", e);
23 }
24 }
25 drop(permit);
26 });
27 }
28 });
29 
30 // Feed messages into channel
31 let mut stream = consumer.stream();
32 loop {
33 tokio::select! {
34 Some(result) = stream.next() => {
35 if let Ok(msg) = result {
36 let owned = msg.detach();
37 if tx.send(owned).await.is_err() {
38 break;
39 }
40 consumer.commit_message(&msg, CommitMode::Async).ok();
41 }
42 }
43 _ = shutdown.changed() => {
44 if *shutdown.borrow() { break; }
45 }
46 }
47 }
48}
49 

Observability with tracing

The tracing crate provides structured, span-based instrumentation:

rust
1use tracing::{info_span, Instrument};
2use tracing_opentelemetry::OpenTelemetrySpanExt;
3use opentelemetry::propagation::TextMapPropagator;
4use opentelemetry_sdk::propagation::TraceContextPropagator;
5 
6async fn instrumented_process(msg: &BorrowedMessage<'_>, handler: &dyn EventHandler) {
7 let propagator = TraceContextPropagator::new();
8 let carrier = KafkaHeaderCarrier::new(msg.headers());
9 let parent_ctx = propagator.extract(&carrier);
10 
11 let span = info_span!(
12 "process_event",
13 messaging.system = "kafka",
14 messaging.destination = msg.topic(),
15 messaging.kafka.partition = msg.partition(),
16 messaging.kafka.offset = msg.offset(),
17 );
18 span.set_parent(parent_ctx);
19 
20 async {
21 if let Some(payload) = msg.payload() {
22 if let Err(e) = handler.handle(payload).await {
23 tracing::error!(error = %e, "Event processing failed");
24 }
25 }
26 }
27 .instrument(span)
28 .await;
29}
30 

Conclusion

Rust's event-driven architecture implementation demands more upfront investment than Go or Java equivalents, but the returns are substantial. Compile-time guarantees eliminate entire categories of runtime failures — null pointer exceptions, data races, and resource leaks simply cannot occur. The memory efficiency means fewer instances to process the same volume, and the performance headroom means your event pipeline handles traffic spikes without breaking a sweat.

The patterns here — typed event dispatch, transactional outbox, bounded concurrency — are battle-tested approaches that work well with Rust's ownership model rather than fighting against it. The key insight is designing your event handlers as pure async functions that take owned data, avoiding the lifetime complexity that trips up many Rust newcomers in async contexts.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026