Back to Journal
AI Architecture

How to Build Agentic AI Workflows Using Fastapi

Step-by-step tutorial for building Agentic AI Workflows with Fastapi, from project setup through deployment.

Muneer Puthiya Purayil 18 min read

Prerequisites

Required Knowledge

This tutorial assumes you're comfortable with Python async/await patterns, REST API design, and have at least a passing familiarity with LLM APIs. You should know how to reason about stateful systems — agentic workflows are fundamentally about orchestrating state transitions across multiple LLM calls, tool invocations, and external service interactions.

Specifically, you need:

  • Python 3.11+ with async programming (asyncio, async generators)
  • FastAPI basics — routing, dependency injection, Pydantic models
  • OpenAI or Anthropic SDK — we'll use openai but the patterns transfer directly
  • Understanding of tool calling / function calling in LLM APIs
  • Basic familiarity with Redis or any async-capable key-value store for state persistence

Development Environment

You need Python 3.11 or later. Python 3.12 is recommended for the improved asyncio performance and better error messages. The architecture we're building uses async throughout — synchronous LLM calls inside an async framework will create thread contention under load.

bash
python --version # 3.11.x or 3.12.x

Redis 7+ for workflow state persistence. You can run it locally via Docker:

bash
docker run -d -p 6379:6379 redis:7-alpine

An OpenAI API key (or Anthropic — the patterns are identical). Set it in your environment:

bash
export OPENAI_API_KEY="sk-..." export REDIS_URL="redis://localhost:6379"

Dependencies

We keep the dependency footprint intentional. Every package here serves a specific purpose:

1fastapi==0.115.0 # ASGI framework with native async support
2uvicorn[standard]==0.30.0 # ASGI server with uvloop for performance
3openai==1.40.0 # LLM client with streaming and tool calling
4redis[asyncio]==5.0.8 # Async Redis client for state persistence
5pydantic==2.8.0 # Runtime type validation (bundled with FastAPI)
6tenacity==9.0.0 # Retry logic with exponential backoff
7structlog==24.4.0 # Structured logging for production observability
8python-ulid==2.7.0 # Sortable unique IDs for workflow runs
9 

Install them:

bash
pip install fastapi uvicorn[standard] openai "redis[asyncio]" tenacity structlog python-ulid

Project Setup

Initialize Project

Structure matters for agentic systems because you'll be adding new agents, tools, and workflow definitions over time. Use a layout that makes those extension points obvious:

1agentic_api/
2├── main.py # FastAPI app, lifespan, global middleware
3├── config.py # Settings via pydantic-settings
4├── workflows/
5 ├── __init__.py
6 ├── base.py # Base workflow class and state machine
7 ├── research.py # Research agent workflow
8 └── orchestrator.py # Multi-agent orchestration
9├── agents/
10 ├── __init__.py
11 ├── base.py # Base agent with tool-calling loop
12 ├── tools.py # Tool definitions and registry
13 └── memory.py # Per-agent memory management
14├── api/
15 ├── __init__.py
16 ├── routes.py # HTTP endpoints
17 └── schemas.py # Request/response Pydantic models
18└── state/
19 ├── __init__.py
20 └── store.py # Redis-backed state persistence
21 

Create the project:

bash
1mkdir agentic_api && cd agentic_api
2mkdir -p workflows agents api state
3touch main.py config.py
4touch workflows/__init__.py workflows/base.py workflows/research.py workflows/orchestrator.py
5touch agents/__init__.py agents/base.py agents/tools.py agents/memory.py
6touch api/__init__.py api/routes.py api/schemas.py
7touch state/__init__.py state/store.py
8 

Configure Build Tools

Create config.py first — everything else imports from here:

python
1# config.py
2from pydantic_settings import BaseSettings
3from functools import lru_cache
4 
5class Settings(BaseSettings):
6 openai_api_key: str
7 openai_model: str = "gpt-4o"
8 redis_url: str = "redis://localhost:6379"
9 max_agent_iterations: int = 10
10 agent_timeout_seconds: int = 120
11 workflow_ttl_seconds: int = 3600
12 
13 model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
14 
15@lru_cache
16def get_settings() -> Settings:
17 return Settings()
18 

The lru_cache ensures settings are parsed once at startup, not on every request. FastAPI's dependency injection system calls this per-request if you don't cache it.

Add Dependencies

Set up main.py with a proper lifespan context that initializes the Redis connection pool and OpenAI client once at startup:

python
1# main.py
2from contextlib import asynccontextmanager
3from fastapi import FastAPI
4import redis.asyncio as aioredis
5from openai import AsyncOpenAI
6import structlog
7from config import get_settings
8 
9log = structlog.get_logger()
10settings = get_settings()
11 
12# Module-level singletons — initialized during lifespan
13redis_client: aioredis.Redis | None = None
14openai_client: AsyncOpenAI | None = None
15 
16@asynccontextmanager
17async def lifespan(app: FastAPI):
18 global redis_client, openai_client
19 
20 redis_client = aioredis.from_url(
21 settings.redis_url,
22 encoding="utf-8",
23 decode_responses=True,
24 max_connections=20,
25 )
26 openai_client = AsyncOpenAI(api_key=settings.openai_api_key)
27 
28 log.info("startup_complete", redis=settings.redis_url, model=settings.openai_model)
29 yield
30 
31 await redis_client.aclose()
32 log.info("shutdown_complete")
33 
34app = FastAPI(title="Agentic AI API", lifespan=lifespan)
35 
36from api.routes import router
37app.include_router(router, prefix="/api/v1")
38 

Core Implementation

Building the Foundation

The core abstraction is a workflow — a named, stateful sequence of agent invocations. Each workflow run has a unique ID, persists its state to Redis, and can be resumed after interruption.

Start with the state store:

python
1# state/store.py
2import json
3from datetime import datetime
4from enum import Enum
5import redis.asyncio as aioredis
6from ulid import ULID
7 
8class WorkflowStatus(str, Enum):
9 PENDING = "pending"
10 RUNNING = "running"
11 AWAITING_INPUT = "awaiting_input"
12 COMPLETED = "completed"
13 FAILED = "failed"
14 
15class WorkflowState:
16 def __init__(
17 self,
18 workflow_id: str,
19 workflow_type: str,
20 input_data: dict,
21 status: WorkflowStatus = WorkflowStatus.PENDING,
22 steps: list[dict] | None = None,
23 output: dict | None = None,
24 error: str | None = None,
25 ):
26 self.workflow_id = workflow_id
27 self.workflow_type = workflow_type
28 self.input_data = input_data
29 self.status = status
30 self.steps = steps or []
31 self.output = output
32 self.error = error
33 self.created_at = datetime.utcnow().isoformat()
34 self.updated_at = self.created_at
35 
36 def to_dict(self) -> dict:
37 return {
38 "workflow_id": self.workflow_id,
39 "workflow_type": self.workflow_type,
40 "input_data": self.input_data,
41 "status": self.status,
42 "steps": self.steps,
43 "output": self.output,
44 "error": self.error,
45 "created_at": self.created_at,
46 "updated_at": self.updated_at,
47 }
48 
49 @classmethod
50 def from_dict(cls, data: dict) -> "WorkflowState":
51 state = cls(
52 workflow_id=data["workflow_id"],
53 workflow_type=data["workflow_type"],
54 input_data=data["input_data"],
55 status=WorkflowStatus(data["status"]),
56 steps=data.get("steps", []),
57 output=data.get("output"),
58 error=data.get("error"),
59 )
60 state.created_at = data["created_at"]
61 state.updated_at = data["updated_at"]
62 return state
63 
64 
65class StateStore:
66 def __init__(self, redis: aioredis.Redis, ttl: int = 3600):
67 self.redis = redis
68 self.ttl = ttl
69 
70 def _key(self, workflow_id: str) -> str:
71 return f"workflow:{workflow_id}"
72 
73 async def create(self, workflow_type: str, input_data: dict) -> WorkflowState:
74 workflow_id = str(ULID())
75 state = WorkflowState(
76 workflow_id=workflow_id,
77 workflow_type=workflow_type,
78 input_data=input_data,
79 )
80 await self.save(state)
81 return state
82 
83 async def save(self, state: WorkflowState) -> None:
84 state.updated_at = datetime.utcnow().isoformat()
85 await self.redis.setex(
86 self._key(state.workflow_id),
87 self.ttl,
88 json.dumps(state.to_dict()),
89 )
90 
91 async def get(self, workflow_id: str) -> WorkflowState | None:
92 raw = await self.redis.get(self._key(workflow_id))
93 if raw is None:
94 return None
95 return WorkflowState.from_dict(json.loads(raw))
96 
97 async def append_step(self, workflow_id: str, step: dict) -> None:
98 state = await self.get(workflow_id)
99 if state:
100 state.steps.append(step)
101 await self.save(state)
102 

Adding Business Logic

The agent is the unit of reasoning. It runs an LLM call with a set of tools, processes tool calls, and loops until the model decides it's done or we hit the iteration limit:

python
1# agents/base.py
2import json
3from datetime import datetime
4from openai import AsyncOpenAI
5from openai.types.chat import ChatCompletionMessageParam
6from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
7import structlog
8from config import get_settings
9 
10log = structlog.get_logger()
11settings = get_settings()
12 
13 
14class Agent:
15 def __init__(
16 self,
17 name: str,
18 system_prompt: str,
19 tools: list[dict],
20 tool_handlers: dict,
21 openai_client: AsyncOpenAI,
22 ):
23 self.name = name
24 self.system_prompt = system_prompt
25 self.tools = tools
26 self.tool_handlers = tool_handlers
27 self.client = openai_client
28 
29 @retry(
30 stop=stop_after_attempt(3),
31 wait=wait_exponential(multiplier=1, min=2, max=10),
32 retry=retry_if_exception_type(Exception),
33 reraise=True,
34 )
35 async def _call_llm(
36 self, messages: list[ChatCompletionMessageParam]
37 ) -> dict:
38 response = await self.client.chat.completions.create(
39 model=settings.openai_model,
40 messages=messages,
41 tools=self.tools if self.tools else None,
42 tool_choice="auto" if self.tools else None,
43 temperature=0.1, # Low temperature for agentic tasks
44 )
45 return response
46 
47 async def run(
48 self,
49 user_message: str,
50 context: dict | None = None,
51 max_iterations: int | None = None,
52 ) -> dict:
53 max_iter = max_iterations or settings.max_agent_iterations
54 messages: list[ChatCompletionMessageParam] = [
55 {"role": "system", "content": self.system_prompt},
56 ]
57 
58 if context:
59 messages.append({
60 "role": "system",
61 "content": f"Context:\n{json.dumps(context, indent=2)}",
62 })
63 
64 messages.append({"role": "user", "content": user_message})
65 
66 iterations = 0
67 tool_call_log = []
68 
69 while iterations < max_iter:
70 iterations += 1
71 response = await self._call_llm(messages)
72 message = response.choices[0].message
73 
74 # Append assistant message
75 messages.append(message)
76 
77 # No tool calls — model is done
78 if not message.tool_calls:
79 return {
80 "content": message.content,
81 "tool_calls": tool_call_log,
82 "iterations": iterations,
83 "finish_reason": response.choices[0].finish_reason,
84 }
85 
86 # Process each tool call
87 for tool_call in message.tool_calls:
88 fn_name = tool_call.function.name
89 fn_args = json.loads(tool_call.function.arguments)
90 
91 log.info(
92 "tool_call",
93 agent=self.name,
94 tool=fn_name,
95 args=fn_args,
96 iteration=iterations,
97 )
98 
99 handler = self.tool_handlers.get(fn_name)
100 if handler is None:
101 result = {"error": f"Unknown tool: {fn_name}"}
102 else:
103 try:
104 result = await handler(**fn_args)
105 except Exception as e:
106 result = {"error": str(e), "tool": fn_name}
107 
108 tool_call_log.append({
109 "tool": fn_name,
110 "args": fn_args,
111 "result": result,
112 "timestamp": datetime.utcnow().isoformat(),
113 })
114 
115 messages.append({
116 "role": "tool",
117 "tool_call_id": tool_call.id,
118 "content": json.dumps(result),
119 })
120 
121 # Hit iteration limit
122 return {
123 "content": "Maximum iterations reached without completing the task.",
124 "tool_calls": tool_call_log,
125 "iterations": iterations,
126 "finish_reason": "max_iterations",
127 }
128 

Connecting Services

Now build the research workflow that ties the state store and agent together. This workflow uses an agent to research a topic by searching the web and synthesizing results:

python
1# agents/tools.py
2import httpx
3import structlog
4 
5log = structlog.get_logger()
6 
7# Tool schemas for the OpenAI API
8RESEARCH_TOOLS = [
9 {
10 "type": "function",
11 "function": {
12 "name": "search_web",
13 "description": "Search the web for current information on a topic",
14 "parameters": {
15 "type": "object",
16 "properties": {
17 "query": {
18 "type": "string",
19 "description": "The search query",
20 },
21 "max_results": {
22 "type": "integer",
23 "description": "Maximum number of results to return (1-10)",
24 "default": 5,
25 },
26 },
27 "required": ["query"],
28 },
29 },
30 },
31 {
32 "type": "function",
33 "function": {
34 "name": "extract_content",
35 "description": "Extract main text content from a URL",
36 "parameters": {
37 "type": "object",
38 "properties": {
39 "url": {
40 "type": "string",
41 "description": "URL to extract content from",
42 },
43 },
44 "required": ["url"],
45 },
46 },
47 },
48]
49 
50 
51async def search_web(query: str, max_results: int = 5) -> dict:
52 """
53 In production, integrate with SerpAPI, Brave Search API, or Exa.
54 This stub demonstrates the interface.
55 """
56 async with httpx.AsyncClient() as client:
57 # Example using Brave Search API
58 # response = await client.get(
59 # "https://api.search.brave.com/res/v1/web/search",
60 # headers={"X-Subscription-Token": BRAVE_API_KEY},
61 # params={"q": query, "count": max_results},
62 # )
63 # results = response.json()["web"]["results"]
64 # return {"results": [{"title": r["title"], "url": r["url"], "snippet": r["description"]} for r in results]}
65 
66 # Stub response for demonstration
67 return {
68 "results": [
69 {
70 "title": f"Result for: {query}",
71 "url": f"https://example.com/result-1",
72 "snippet": f"Relevant information about {query}...",
73 }
74 ],
75 "query": query,
76 }
77 
78 
79async def extract_content(url: str) -> dict:
80 """Extract readable content from a URL."""
81 async with httpx.AsyncClient(follow_redirects=True, timeout=15) as client:
82 try:
83 response = await client.get(url, headers={"User-Agent": "ResearchBot/1.0"})
84 response.raise_for_status()
85 # In production, use trafilatura or readability-lxml for extraction
86 return {
87 "url": url,
88 "content": response.text[:3000], # Truncate for context window
89 "status": "success",
90 }
91 except Exception as e:
92 return {"url": url, "error": str(e), "status": "failed"}
93 
94 
95# workflows/research.py
96from openai import AsyncOpenAI
97from agents.base import Agent
98from agents.tools import RESEARCH_TOOLS, search_web, extract_content
99from state.store import StateStore, WorkflowStatus
100import structlog
101 
102log = structlog.get_logger()
103 
104RESEARCH_SYSTEM_PROMPT = """You are a rigorous research agent. Your job is to:
1051. Search for authoritative sources on the given topic
1062. Extract and read the most relevant content
1073. Synthesize findings into a structured report
108 
109Always verify information across multiple sources. Cite your sources.
110When you have enough information, provide a comprehensive synthesis — do not keep searching indefinitely."""
111 
112 
113class ResearchWorkflow:
114 def __init__(self, state_store: StateStore, openai_client: AsyncOpenAI):
115 self.store = state_store
116 self.agent = Agent(
117 name="researcher",
118 system_prompt=RESEARCH_SYSTEM_PROMPT,
119 tools=RESEARCH_TOOLS,
120 tool_handlers={
121 "search_web": search_web,
122 "extract_content": extract_content,
123 },
124 openai_client=openai_client,
125 )
126 
127 async def run(self, topic: str, depth: str = "standard") -> str:
128 state = await self.store.create(
129 workflow_type="research",
130 input_data={"topic": topic, "depth": depth},
131 )
132 workflow_id = state.workflow_id
133 
134 try:
135 state.status = WorkflowStatus.RUNNING
136 await self.store.save(state)
137 
138 prompt = f"Research this topic thoroughly: {topic}\nDepth: {depth}"
139 result = await self.agent.run(
140 user_message=prompt,
141 max_iterations=8 if depth == "deep" else 5,
142 )
143 
144 await self.store.append_step(workflow_id, {
145 "type": "agent_complete",
146 "iterations": result["iterations"],
147 "tool_calls_count": len(result["tool_calls"]),
148 })
149 
150 state = await self.store.get(workflow_id)
151 state.status = WorkflowStatus.COMPLETED
152 state.output = {"report": result["content"], "sources": result["tool_calls"]}
153 await self.store.save(state)
154 
155 log.info(
156 "workflow_complete",
157 workflow_id=workflow_id,
158 iterations=result["iterations"],
159 )
160 return workflow_id
161 
162 except Exception as e:
163 state = await self.store.get(workflow_id)
164 state.status = WorkflowStatus.FAILED
165 state.error = str(e)
166 await self.store.save(state)
167 log.error("workflow_failed", workflow_id=workflow_id, error=str(e))
168 raise
169 

Adding Features

Feature 1: Core Capability — Multi-Step Orchestration

Real agentic systems involve multiple specialized agents. Build an orchestrator that decomposes a task, delegates to specialized agents, and synthesizes results:

python
1# workflows/orchestrator.py
2import asyncio
3from openai import AsyncOpenAI
4from agents.base import Agent
5from state.store import StateStore, WorkflowStatus
6import structlog
7 
8log = structlog.get_logger()
9 
10PLANNER_PROMPT = """You are a task planning agent. Given a complex goal, break it into
11discrete subtasks that can be assigned to specialized agents.
12 
13Respond with a JSON array of subtasks:
14[
15 {"id": "1", "agent": "researcher", "task": "...", "depends_on": []},
16 {"id": "2", "agent": "analyst", "task": "...", "depends_on": ["1"]}
17]
18 
19Available agents: researcher, analyst, writer
20Keep subtasks focused and achievable in a single agent run."""
21 
22class OrchestratorWorkflow:
23 def __init__(self, state_store: StateStore, openai_client: AsyncOpenAI):
24 self.store = state_store
25 self.client = openai_client
26 
27 self.planner = Agent(
28 name="planner",
29 system_prompt=PLANNER_PROMPT,
30 tools=[], # Planner only reasons, no tools
31 tool_handlers={},
32 openai_client=openai_client,
33 )
34 
35 async def _execute_subtasks(
36 self,
37 subtasks: list[dict],
38 workflow_id: str,
39 ) -> dict[str, str]:
40 results: dict[str, str] = {}
41 
42 # Group subtasks by dependency level
43 levels: dict[int, list[dict]] = {}
44 for task in subtasks:
45 level = len(task.get("depends_on", []))
46 levels.setdefault(level, []).append(task)
47 
48 for level in sorted(levels.keys()):
49 level_tasks = levels[level]
50 # Run tasks at same level in parallel
51 tasks_coro = [
52 self._run_subtask(t, results, workflow_id)
53 for t in level_tasks
54 ]
55 level_results = await asyncio.gather(*tasks_coro, return_exceptions=True)
56 
57 for task, result in zip(level_tasks, level_results):
58 if isinstance(result, Exception):
59 results[task["id"]] = f"ERROR: {result}"
60 else:
61 results[task["id"]] = result
62 
63 return results
64 
65 async def _run_subtask(
66 self,
67 subtask: dict,
68 prior_results: dict[str, str],
69 workflow_id: str,
70 ) -> str:
71 context = {
72 dep_id: prior_results.get(dep_id, "")
73 for dep_id in subtask.get("depends_on", [])
74 }
75 
76 # Route to appropriate agent
77 agent = self._get_agent(subtask["agent"])
78 result = await agent.run(
79 user_message=subtask["task"],
80 context=context if context else None,
81 )
82 
83 await self.store.append_step(workflow_id, {
84 "subtask_id": subtask["id"],
85 "agent": subtask["agent"],
86 "task": subtask["task"],
87 "iterations": result["iterations"],
88 })
89 
90 return result["content"]
91 
92 def _get_agent(self, agent_type: str) -> Agent:
93 # In production, inject specialized agents via constructor
94 # This simplified version reuses the planner's client
95 prompts = {
96 "researcher": "You are a research specialist. Gather factual information.",
97 "analyst": "You are an analyst. Synthesize information and identify insights.",
98 "writer": "You are a technical writer. Produce clear, structured content.",
99 }
100 return Agent(
101 name=agent_type,
102 system_prompt=prompts.get(agent_type, "You are a helpful assistant."),
103 tools=[],
104 tool_handlers={},
105 openai_client=self.client,
106 )
107 
108 async def run(self, goal: str) -> str:
109 state = await self.store.create("orchestrator", {"goal": goal})
110 workflow_id = state.workflow_id
111 
112 state.status = WorkflowStatus.RUNNING
113 await self.store.save(state)
114 
115 # Plan
116 plan_result = await self.planner.run(f"Plan how to achieve: {goal}")
117 import json, re
118 json_match = re.search(r'\[.*\]', plan_result["content"], re.DOTALL)
119 subtasks = json.loads(json_match.group()) if json_match else []
120 
121 # Execute
122 results = await self._execute_subtasks(subtasks, workflow_id)
123 
124 # Synthesize
125 synthesizer = self._get_agent("writer")
126 synthesis = await synthesizer.run(
127 f"Synthesize these research outputs into a final response for: {goal}",
128 context=results,
129 )
130 
131 state = await self.store.get(workflow_id)
132 state.status = WorkflowStatus.COMPLETED
133 state.output = {"result": synthesis["content"], "subtask_results": results}
134 await self.store.save(state)
135 return workflow_id
136 

Feature 2: Extensions — Streaming Responses

Agentic tasks take time. Stream intermediate progress via Server-Sent Events so the UI stays responsive:

python
1# api/routes.py
2import asyncio
3import json
4from fastapi import APIRouter, Depends, HTTPException
5from fastapi.responses import StreamingResponse
6from pydantic import BaseModel
7from main import redis_client, openai_client
8from state.store import StateStore, WorkflowStatus
9from workflows.research import ResearchWorkflow
10from config import get_settings
11 
12router = APIRouter()
13settings = get_settings()
14 
15 
16def get_state_store() -> StateStore:
17 return StateStore(redis_client, ttl=settings.workflow_ttl_seconds)
18 
19 
20class ResearchRequest(BaseModel):
21 topic: str
22 depth: str = "standard"
23 
24 
25class WorkflowResponse(BaseModel):
26 workflow_id: str
27 status: str
28 
29 
30@router.post("/workflows/research", response_model=WorkflowResponse)
31async def start_research(
32 request: ResearchRequest,
33 store: StateStore = Depends(get_state_store),
34):
35 workflow = ResearchWorkflow(store, openai_client)
36 workflow_id = await workflow.run(request.topic, request.depth)
37 state = await store.get(workflow_id)
38 return WorkflowResponse(workflow_id=workflow_id, status=state.status)
39 
40 
41@router.get("/workflows/{workflow_id}")
42async def get_workflow(
43 workflow_id: str,
44 store: StateStore = Depends(get_state_store),
45):
46 state = await store.get(workflow_id)
47 if not state:
48 raise HTTPException(status_code=404, detail="Workflow not found")
49 return state.to_dict()
50 
51 
52@router.get("/workflows/{workflow_id}/stream")
53async def stream_workflow_progress(
54 workflow_id: str,
55 store: StateStore = Depends(get_state_store),
56):
57 """
58 SSE endpoint — poll Redis state until the workflow completes.
59 Connect from the client: new EventSource('/api/v1/workflows/{id}/stream')
60 """
61 async def event_generator():
62 terminal_statuses = {WorkflowStatus.COMPLETED, WorkflowStatus.FAILED}
63 poll_interval = 1.0 # seconds
64 
65 while True:
66 state = await store.get(workflow_id)
67 if state is None:
68 yield f"data: {json.dumps({'error': 'not_found'})}\n\n"
69 return
70 
71 payload = {
72 "workflow_id": state.workflow_id,
73 "status": state.status,
74 "steps_count": len(state.steps),
75 "latest_step": state.steps[-1] if state.steps else None,
76 }
77 
78 if state.status == WorkflowStatus.COMPLETED:
79 payload["output"] = state.output
80 
81 yield f"data: {json.dumps(payload)}\n\n"
82 
83 if state.status in terminal_statuses:
84 return
85 
86 await asyncio.sleep(poll_interval)
87 
88 return StreamingResponse(
89 event_generator(),
90 media_type="text/event-stream",
91 headers={
92 "Cache-Control": "no-cache",
93 "X-Accel-Buffering": "no", # Disable nginx buffering
94 },
95 )
96 

Feature 3: Polish — Background Task Execution

For long-running workflows, don't block the HTTP request. Use FastAPI's BackgroundTasks to start the workflow asynchronously and return the ID immediately:

python
1# Add to api/routes.py
2from fastapi import BackgroundTasks
3 
4@router.post("/workflows/orchestrate", response_model=WorkflowResponse)
5async def start_orchestration(
6 request: dict,
7 background_tasks: BackgroundTasks,
8 store: StateStore = Depends(get_state_store),
9):
10 from workflows.orchestrator import OrchestratorWorkflow
11 goal = request.get("goal", "")
12 
13 # Pre-create the state so we can return the ID immediately
14 state = await store.create("orchestrator", {"goal": goal})
15 workflow_id = state.workflow_id
16 
17 async def run_workflow():
18 try:
19 workflow = OrchestratorWorkflow(store, openai_client)
20 await workflow.run(goal)
21 except Exception as e:
22 state = await store.get(workflow_id)
23 if state:
24 state.status = WorkflowStatus.FAILED
25 state.error = str(e)
26 await store.save(state)
27 
28 background_tasks.add_task(run_workflow)
29 return WorkflowResponse(workflow_id=workflow_id, status="pending")
30 

Need a second opinion on your AI systems architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Error Handling

Error Classification

In agentic systems, errors fall into distinct categories that require different recovery strategies:

CategoryExamplesStrategy
TransientRate limit, timeout, network blipRetry with backoff
ModelMalformed tool call JSON, refusalPrompt correction + retry
ToolExternal API down, bad URLReturn error to model, let it adapt
LogicInfinite loop, contradictory goalsMax iterations + circuit breaker
FatalInvalid API key, disk fullFail fast, alert on-call
python
1# agents/base.py — enhanced error handling
2from openai import RateLimitError, APIConnectionError, APIStatusError
3 
4class AgentError(Exception):
5 def __init__(self, message: str, error_type: str, retryable: bool = False):
6 super().__init__(message)
7 self.error_type = error_type
8 self.retryable = retryable
9 
10def classify_openai_error(e: Exception) -> AgentError:
11 if isinstance(e, RateLimitError):
12 return AgentError(str(e), "rate_limit", retryable=True)
13 if isinstance(e, APIConnectionError):
14 return AgentError(str(e), "connection", retryable=True)
15 if isinstance(e, APIStatusError) and e.status_code >= 500:
16 return AgentError(str(e), "server_error", retryable=True)
17 return AgentError(str(e), "fatal", retryable=False)
18 

Recovery Strategies

Tool failures should be returned to the model as structured errors — the model can often adapt its strategy. Only escalate to workflow failure when the agent cannot make progress:

python
1# In Agent.run() — tool error handling
2try:
3 result = await handler(**fn_args)
4except httpx.TimeoutException:
5 result = {
6 "error": "timeout",
7 "message": "The request timed out. Try a different approach or simpler query.",
8 "retryable": True,
9 }
10except httpx.HTTPStatusError as e:
11 result = {
12 "error": "http_error",
13 "status_code": e.response.status_code,
14 "message": f"HTTP {e.response.status_code} from external service.",
15 }
16except Exception as e:
17 result = {"error": "unexpected", "message": str(e)}
18 
19# Always return tool results — never raise inside the tool call loop
20messages.append({
21 "role": "tool",
22 "tool_call_id": tool_call.id,
23 "content": json.dumps(result),
24})
25 

Detect loop behavior — if the model calls the same tool with the same arguments repeatedly, it's stuck:

python
1# Stall detection in Agent.run()
2recent_calls = [(tc["tool"], json.dumps(tc["args"])) for tc in tool_call_log[-4:]]
3if len(recent_calls) >= 4 and len(set(recent_calls)) == 1:
4 raise AgentError(
5 "Agent stuck in loop — same tool/args called 4 times consecutively",
6 error_type="loop_detected",
7 retryable=False,
8 )
9 

User-Facing Messages

Never expose raw LLM errors or stack traces to API consumers. Map internal states to clean, actionable messages:

python
1# api/routes.py — error response normalization
2from fastapi import Request
3from fastapi.responses import JSONResponse
4 
5ERROR_MESSAGES = {
6 "rate_limit": "The AI service is temporarily busy. Please retry in 30 seconds.",
7 "connection": "Unable to reach the AI service. Please check your connection.",
8 "loop_detected": "The agent could not complete this task. Try rephrasing your request.",
9 "max_iterations": "This task is too complex to complete automatically. Consider breaking it into smaller steps.",
10 "fatal": "An unexpected error occurred. Our team has been notified.",
11}
12 
13@app.exception_handler(AgentError)
14async def agent_error_handler(request: Request, exc: AgentError):
15 message = ERROR_MESSAGES.get(exc.error_type, ERROR_MESSAGES["fatal"])
16 return JSONResponse(
17 status_code=503 if exc.retryable else 422,
18 content={
19 "error": exc.error_type,
20 "message": message,
21 "retryable": exc.retryable,
22 },
23 )
24 

Deployment

Environment Configuration

All configuration goes through environment variables — no hardcoded values, no config files committed to the repo:

bash
1# .env.production
2OPENAI_API_KEY=sk-...
3OPENAI_MODEL=gpt-4o
4REDIS_URL=redis://redis:6379/0
5MAX_AGENT_ITERATIONS=10
6AGENT_TIMEOUT_SECONDS=120
7WORKFLOW_TTL_SECONDS=86400
8 

For production Redis, use Redis Cluster or Upstash with TLS:

bash
REDIS_URL=rediss://default:password@your-redis-host:6380

Dockerfile for the FastAPI service:

dockerfile
1FROM python:3.12-slim
2 
3WORKDIR /app
4COPY requirements.txt .
5RUN pip install --no-cache-dir -r requirements.txt
6 
7COPY . .
8 
9CMD ["uvicorn", "main:app", \
10 "--host", "0.0.0.0", \
11 "--port", "8000", \
12 "--workers", "4", \
13 "--loop", "uvloop", \
14 "--access-log"]
15 

CI/CD Pipeline

yaml
1# .github/workflows/deploy.yml
2name: Deploy
3 
4on:
5 push:
6 branches: [main]
7 
8jobs:
9 test:
10 runs-on: ubuntu-latest
11 services:
12 redis:
13 image: redis:7-alpine
14 ports: ["6379:6379"]
15 steps:
16 - uses: actions/checkout@v4
17 - uses: actions/setup-python@v5
18 with:
19 python-version: "3.12"
20 - run: pip install -r requirements.txt
21 - run: pytest tests/ -v --timeout=30
22 env:
23 OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
24 REDIS_URL: redis://localhost:6379
25 
26 deploy:
27 needs: test
28 runs-on: ubuntu-latest
29 steps:
30 - uses: actions/checkout@v4
31 - name: Build and push Docker image
32 run: |
33 docker build -t ghcr.io/${{ github.repository }}:${{ github.sha }} .
34 docker push ghcr.io/${{ github.repository }}:${{ github.sha }}
35 - name: Deploy to production
36 run: |
37 # Trigger your deployment (Fly.io, Railway, ECS, etc.)
38 flyctl deploy --image ghcr.io/${{ github.repository }}:${{ github.sha }}
39 env:
40 FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
41 

Monitoring Setup

Instrument your workflows so you can debug production issues without guessing:

python
1# Structured logging in workflows
2import structlog
3 
4structlog.configure(
5 processors=[
6 structlog.stdlib.filter_by_level,
7 structlog.stdlib.add_logger_name,
8 structlog.stdlib.add_log_level,
9 structlog.processors.TimeStamper(fmt="iso"),
10 structlog.processors.StackInfoRenderer(),
11 structlog.processors.format_exc_info,
12 structlog.processors.JSONRenderer(), # JSON for log aggregation
13 ],
14)
15 
16# In Agent.run(), add timing
17import time
18 
19start = time.monotonic()
20response = await self._call_llm(messages)
21elapsed_ms = int((time.monotonic() - start) * 1000)
22 
23log.info(
24 "llm_call",
25 agent=self.name,
26 iteration=iterations,
27 model=settings.openai_model,
28 latency_ms=elapsed_ms,
29 prompt_tokens=response.usage.prompt_tokens,
30 completion_tokens=response.usage.completion_tokens,
31 finish_reason=response.choices[0].finish_reason,
32)
33 

Key metrics to track in Prometheus/Datadog:

  • workflow_duration_seconds — histogram by workflow type
  • agent_iterations_count — histogram to catch runaway agents
  • llm_tokens_total — counter by model for cost attribution
  • tool_call_errors_total — counter by tool and error type
  • workflow_status_total — counter by status (completed/failed)

Next Steps

Persistent memory across workflow runs. Currently, each workflow starts fresh. Add a vector store (Pinecone, Qdrant) to embed and retrieve relevant context from past runs. This lets the agent say "I already researched this last week" and avoid redundant work.

Human-in-the-loop interruption. Set workflow status to AWAITING_INPUT mid-execution and expose a POST /workflows/{id}/input endpoint. The agent pauses at a checkpoint, waits for user input, then resumes. Critical for high-stakes decisions.

Agent evaluation framework. Build a test harness that runs each agent against fixed scenarios and scores the output. Track scores in CI — catch regressions before they reach production.

Cost guardrails. Track token consumption per workflow and abort with a clear error when it exceeds your budget. A rogue prompt that triggers 50 LLM calls costs real money.

Multi-model routing. Route simple tool calls to gpt-4o-mini and complex reasoning to gpt-4o. This cuts token costs by 60-80% on typical workloads without measurable quality loss.

Further Reading

  • OpenAI Function Calling documentation — canonical reference for tool calling patterns
  • LangGraph — graph-based workflow orchestration with first-class support for cycles, branching, and human-in-the-loop
  • Instructor — enforces structured output from LLMs using Pydantic, eliminates the JSON parsing fragility in planner responses
  • Prefect — production workflow orchestration that integrates well with agentic pipelines when you need durability guarantees beyond Redis TTLs

Conclusion

FastAPI's async-native design makes it a natural fit for agentic AI backends where every operation — LLM calls, tool invocations, state persistence — is I/O-bound. The architecture laid out here separates concerns cleanly: FastAPI handles HTTP routing and request validation, Pydantic models enforce typed state across workflow steps, Redis persists workflow state for crash recovery, and tenacity handles the retry logic that production LLM integrations demand.

The key implementation decisions that pay off in production: use structured logging with a run ID propagated through every step, persist workflow state externally so workers are stateless and horizontally scalable, validate LLM outputs with Pydantic before acting on them, and version your prompts as files rather than inline strings. Start with a single-agent workflow backed by a task queue, get it stable in production, and add multi-agent orchestration only when you have empirical evidence that a single agent cannot handle the task. The framework gives you the async primitives and validation layer — the production reliability comes from how consistently you apply them.

FAQ

Need expert help?

Building with agentic AI?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026