Back to Journal
System Design

Complete Guide to Event-Driven Architecture with Python

A comprehensive guide to implementing Event-Driven Architecture using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 16 min read

Python's event-driven architecture story has matured significantly with asyncio, modern Kafka clients, and frameworks like FastAPI that embrace asynchronous patterns. While Python won't win raw throughput benchmarks against Go or Rust, its expressiveness and the speed at which you can build, iterate, and deploy event consumers makes it a strong choice for many production systems.

Architecture Overview

Event-driven systems in Python typically combine an async Kafka consumer with either FastAPI for HTTP ingress or a standalone consumer process. The key architectural decision is choosing between confluent-kafka-python (librdkafka wrapper, higher throughput) and aiokafka (pure Python async, better asyncio integration).

python
1from dataclasses import dataclass, field
2from datetime import datetime
3from decimal import Decimal
4from enum import Enum
5from typing import Optional
6import uuid
7 
8 
9class EventType(Enum):
10 ORDER_CREATED = "OrderCreated"
11 ORDER_SHIPPED = "OrderShipped"
12 ORDER_CANCELLED = "OrderCancelled"
13 
14 
15@dataclass(frozen=True)
16class OrderCreated:
17 order_id: str
18 customer_id: str
19 items: list[dict]
20 total: Decimal
21 currency: str = "USD"
22 event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
23 timestamp: datetime = field(default_factory=datetime.utcnow)
24 event_type: str = field(default=EventType.ORDER_CREATED.value, init=False)
25 

Async Kafka Consumer with aiokafka

aiokafka integrates natively with Python's asyncio event loop, making it straightforward to combine Kafka consumption with other async operations:

python
1import asyncio
2import json
3import logging
4from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
5from aiokafka.errors import KafkaError
6 
7logger = logging.getLogger(__name__)
8 
9 
10class EventConsumer:
11 def __init__(
12 self,
13 brokers: str,
14 group_id: str,
15 topic: str,
16 handlers: dict[str, callable],
17 ):
18 self.brokers = brokers
19 self.group_id = group_id
20 self.topic = topic
21 self.handlers = handlers
22 self._consumer: Optional[AIOKafkaConsumer] = None
23 self._dlq_producer: Optional[AIOKafkaProducer] = None
24 
25 async def start(self):
26 self._consumer = AIOKafkaConsumer(
27 self.topic,
28 bootstrap_servers=self.brokers,
29 group_id=self.group_id,
30 auto_offset_reset="earliest",
31 enable_auto_commit=False,
32 value_deserializer=lambda v: json.loads(v.decode("utf-8")),
33 max_poll_records=500,
34 session_timeout_ms=30000,
35 )
36 self._dlq_producer = AIOKafkaProducer(
37 bootstrap_servers=self.brokers,
38 value_serializer=lambda v: json.dumps(v).encode("utf-8"),
39 )
40 await self._consumer.start()
41 await self._dlq_producer.start()
42 
43 async def stop(self):
44 if self._consumer:
45 await self._consumer.stop()
46 if self._dlq_producer:
47 await self._dlq_producer.stop()
48 
49 async def run(self):
50 await self.start()
51 try:
52 async for msg in self._consumer:
53 try:
54 event_type = None
55 for header_key, header_val in msg.headers:
56 if header_key == "event_type":
57 event_type = header_val.decode("utf-8")
58 break
59 
60 handler = self.handlers.get(event_type)
61 if handler is None:
62 logger.warning(f"No handler for event type: {event_type}")
63 await self._consumer.commit()
64 continue
65 
66 await handler(msg.value)
67 await self._consumer.commit()
68 
69 except Exception as e:
70 logger.error(
71 f"Failed to process message at offset {msg.offset}: {e}"
72 )
73 await self._send_to_dlq(msg, str(e))
74 await self._consumer.commit()
75 finally:
76 await self.stop()
77 
78 async def _send_to_dlq(self, msg, error: str):
79 dlq_payload = {
80 "original_topic": msg.topic,
81 "original_partition": msg.partition,
82 "original_offset": msg.offset,
83 "error": error,
84 "payload": msg.value,
85 }
86 await self._dlq_producer.send(
87 f"{msg.topic}.dlq",
88 value=dlq_payload,
89 key=msg.key,
90 )
91 

Event Producer with Retry Logic

A production producer needs idempotency, batching, and proper error handling:

python
1import asyncio
2import json
3import uuid
4from datetime import datetime
5from aiokafka import AIOKafkaProducer
6 
7 
8class EventPublisher:
9 def __init__(self, brokers: str):
10 self.brokers = brokers
11 self._producer: Optional[AIOKafkaProducer] = None
12 
13 async def start(self):
14 self._producer = AIOKafkaProducer(
15 bootstrap_servers=self.brokers,
16 acks="all",
17 enable_idempotence=True,
18 max_batch_size=65536,
19 linger_ms=10,
20 value_serializer=lambda v: json.dumps(v, default=str).encode("utf-8"),
21 key_serializer=lambda k: k.encode("utf-8") if k else None,
22 )
23 await self._producer.start()
24 
25 async def publish(self, topic: str, key: str, event: dict) -> None:
26 headers = [
27 ("event_type", event.get("event_type", "unknown").encode("utf-8")),
28 ("event_id", str(uuid.uuid4()).encode("utf-8")),
29 ("produced_at", datetime.utcnow().isoformat().encode("utf-8")),
30 ]
31 await self._producer.send_and_wait(
32 topic,
33 key=key,
34 value=event,
35 headers=headers,
36 )
37 
38 async def stop(self):
39 if self._producer:
40 await self._producer.stop()
41 

Event Router Pattern

Decouple event routing from business logic with a typed dispatch system:

python
1from typing import Callable, Awaitable, TypeVar, Type
2from dataclasses import dataclass
3import json
4 
5T = TypeVar("T")
6 
7 
8class EventRouter:
9 def __init__(self):
10 self._handlers: dict[str, tuple[Type, Callable]] = {}
11 
12 def register(self, event_type: str, model: Type[T], handler: Callable[[T], Awaitable[None]]):
13 self._handlers[event_type] = (model, handler)
14 
15 async def dispatch(self, event_type: str, raw_data: dict) -> None:
16 if event_type not in self._handlers:
17 raise ValueError(f"No handler registered for {event_type}")
18 
19 model_cls, handler = self._handlers[event_type]
20 event = model_cls(**raw_data)
21 await handler(event)
22 
23 
24# Usage
25router = EventRouter()
26router.register("OrderCreated", OrderCreated, handle_order_created)
27router.register("OrderShipped", OrderShipped, handle_order_shipped)
28 
29async def handle_order_created(event: OrderCreated):
30 logger.info(f"Processing order {event.order_id} for customer {event.customer_id}")
31 # Business logic here
32 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Transactional Outbox with SQLAlchemy

The outbox pattern prevents dual-write inconsistencies between your database and Kafka:

python
1from sqlalchemy import Column, String, DateTime, Text
2from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
3from sqlalchemy.orm import DeclarativeBase
4import json
5import uuid
6from datetime import datetime
7 
8 
9class Base(DeclarativeBase):
10 pass
11 
12 
13class OutboxEvent(Base):
14 __tablename__ = "outbox_events"
15 
16 id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
17 aggregate_id = Column(String, nullable=False, index=True)
18 aggregate_type = Column(String, nullable=False)
19 event_type = Column(String, nullable=False)
20 payload = Column(Text, nullable=False)
21 created_at = Column(DateTime, default=datetime.utcnow)
22 published_at = Column(DateTime, nullable=True)
23 
24 
25class OrderService:
26 def __init__(self, session: AsyncSession):
27 self.session = session
28 
29 async def create_order(self, cmd: CreateOrderCommand) -> Order:
30 order = Order(
31 id=str(uuid.uuid4()),
32 customer_id=cmd.customer_id,
33 items=cmd.items,
34 status="created",
35 )
36 self.session.add(order)
37 
38 event = OrderCreated(
39 order_id=order.id,
40 customer_id=order.customer_id,
41 items=cmd.items,
42 total=order.total,
43 )
44 
45 outbox_entry = OutboxEvent(
46 aggregate_id=order.id,
47 aggregate_type="Order",
48 event_type="OrderCreated",
49 payload=json.dumps(event.__dict__, default=str),
50 )
51 self.session.add(outbox_entry)
52 
53 await self.session.commit()
54 return order
55 
56 
57class OutboxPoller:
58 def __init__(self, session_factory, publisher: EventPublisher):
59 self.session_factory = session_factory
60 self.publisher = publisher
61 
62 async def poll_loop(self, interval: float = 0.1):
63 while True:
64 async with self.session_factory() as session:
65 result = await session.execute(
66 select(OutboxEvent)
67 .where(OutboxEvent.published_at.is_(None))
68 .order_by(OutboxEvent.created_at)
69 .limit(100)
70 )
71 entries = result.scalars().all()
72 
73 for entry in entries:
74 try:
75 await self.publisher.publish(
76 topic=f"{entry.aggregate_type.lower()}-events",
77 key=entry.aggregate_id,
78 event=json.loads(entry.payload),
79 )
80 entry.published_at = datetime.utcnow()
81 await session.commit()
82 except Exception as e:
83 logger.error(f"Failed to publish outbox entry {entry.id}: {e}")
84 await session.rollback()
85 
86 await asyncio.sleep(interval)
87 

Structured Logging and Observability

Use structlog for consistent, machine-parseable log output across your event pipeline:

python
1import structlog
2from opentelemetry import trace
3from opentelemetry.trace.propagation import TraceContextTextMapPropagator
4 
5structlog.configure(
6 processors=[
7 structlog.contextvars.merge_contextvars,
8 structlog.processors.add_log_level,
9 structlog.processors.TimeStamper(fmt="iso"),
10 structlog.processors.JSONRenderer(),
11 ],
12)
13 
14logger = structlog.get_logger()
15tracer = trace.get_tracer("event-processor")
16propagator = TraceContextTextMapPropagator()
17 
18 
19async def traced_handler(msg):
20 # Extract trace context from Kafka headers
21 carrier = {k: v.decode() for k, v in msg.headers}
22 ctx = propagator.extract(carrier=carrier)
23 
24 with tracer.start_as_current_span(
25 f"process {msg.topic}",
26 context=ctx,
27 attributes={
28 "messaging.system": "kafka",
29 "messaging.destination": msg.topic,
30 "messaging.kafka.partition": msg.partition,
31 "messaging.kafka.offset": msg.offset,
32 },
33 ) as span:
34 logger.info(
35 "processing_event",
36 topic=msg.topic,
37 partition=msg.partition,
38 offset=msg.offset,
39 trace_id=span.get_span_context().trace_id,
40 )
41 await dispatch(msg)
42 

Concurrency with asyncio.TaskGroup

Python 3.11+ TaskGroups provide structured concurrency for parallel event processing:

python
1async def process_batch(messages: list, max_concurrency: int = 10):
2 semaphore = asyncio.Semaphore(max_concurrency)
3 
4 async def bounded_process(msg):
5 async with semaphore:
6 await process_message(msg)
7 
8 async with asyncio.TaskGroup() as tg:
9 for msg in messages:
10 tg.create_task(bounded_process(msg))
11 

Conclusion

Python's event-driven architecture capabilities are production-ready for systems processing tens of thousands of events per second. The async/await model maps cleanly to the consume-process-produce pattern, and libraries like aiokafka provide first-class asyncio integration. Where Python particularly shines is in event consumers that need rich data processing — pandas transformations, ML inference, or complex business rules — where the ecosystem depth outweighs the raw throughput advantage of compiled languages.

The patterns in this guide — typed event routing, transactional outbox, structured observability — compose into a maintainable system that grows with your requirements. Start with a simple consumer and a clean handler interface, add the outbox when consistency matters, and introduce stream processing only when windowed aggregations or stateful transformations become necessary.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026