Back to Journal
AI Architecture

How to Build LLM Fine-Tuning Production Using Fastapi

Step-by-step tutorial for building LLM Fine-Tuning Production with Fastapi, from project setup through deployment.

Muneer Puthiya Purayil 22 min read

This tutorial walks through building a complete LLM fine-tuning production system using FastAPI, from data ingestion endpoints to training job management and model serving. By the end, you'll have a self-contained service that accepts training data, manages fine-tuning runs, and serves the resulting models via an OpenAI-compatible API.

Project Setup

bash
1mkdir llm-finetune-service && cd llm-finetune-service
2python -m venv .venv && source .venv/bin/activate
3pip install fastapi uvicorn[standard] pydantic sqlalchemy[asyncio] asyncpg
4pip install torch transformers peft trl datasets bitsandbytes
5pip install vllm celery[redis] redis
6 

Project Structure

1llm-finetune-service/
2├── app/
3│ ├── __init__.py
4│ ├── main.py
5│ ├── config.py
6│ ├── models/
7│ │ ├── __init__.py
8│ │ ├── database.py
9│ │ └── schemas.py
10│ ├── routers/
11│ │ ├── __init__.py
12│ │ ├── datasets.py
13│ │ ├── training.py
14│ │ ├── evaluation.py
15│ │ └── models.py
16│ ├── services/
17│ │ ├── __init__.py
18│ │ ├── data_pipeline.py
19│ │ ├── trainer.py
20│ │ ├── evaluator.py
21│ │ └── model_registry.py
22│ └── workers/
23│ ├── __init__.py
24│ └── training_worker.py
25├── scripts/
26│ └── train.py
27├── tests/
28├── docker-compose.yml
29└── Dockerfile
30 

Configuration and Database Models

python
1# app/config.py
2from pydantic_settings import BaseSettings
3 
4class Settings(BaseSettings):
5 database_url: str
6 redis_url: str = "redis://localhost:6379"
7 model_storage_path: str = "/models"
8 base_model: str = "meta-llama/Llama-3.1-8B-Instruct"
9 max_concurrent_training_jobs: int = 1
10 
11 class Config:
12 env_file = ".env"
13 
14settings = Settings()
15 
python
1# app/models/database.py
2from sqlalchemy import Column, String, Integer, Float, JSON, DateTime, Enum
3from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
4from sqlalchemy.orm import sessionmaker, declarative_base
5from datetime import datetime
6import enum
7 
8Base = declarative_base()
9 
10class DatasetStatus(str, enum.Enum):
11 UPLOADING = "uploading"
12 VALIDATING = "validating"
13 READY = "ready"
14 ERROR = "error"
15 
16class TrainingStatus(str, enum.Enum):
17 QUEUED = "queued"
18 RUNNING = "running"
19 COMPLETED = "completed"
20 FAILED = "failed"
21 CANCELLED = "cancelled"
22 
23class ModelStage(str, enum.Enum):
24 DEVELOPMENT = "development"
25 STAGING = "staging"
26 PRODUCTION = "production"
27 
28class TrainingDataset(Base):
29 __tablename__ = "training_datasets"
30 id = Column(String, primary_key=True)
31 name = Column(String, nullable=False)
32 file_path = Column(String, nullable=False)
33 status = Column(Enum(DatasetStatus), default=DatasetStatus.UPLOADING)
34 num_examples = Column(Integer, default=0)
35 validation_report = Column(JSON)
36 created_at = Column(DateTime, default=datetime.utcnow)
37 
38class TrainingJob(Base):
39 __tablename__ = "training_jobs"
40 id = Column(String, primary_key=True)
41 dataset_id = Column(String, nullable=False)
42 status = Column(Enum(TrainingStatus), default=TrainingStatus.QUEUED)
43 config = Column(JSON, nullable=False)
44 metrics = Column(JSON)
45 output_path = Column(String)
46 started_at = Column(DateTime)
47 completed_at = Column(DateTime)
48 error_message = Column(String)
49 created_at = Column(DateTime, default=datetime.utcnow)
50 
51class ModelVersion(Base):
52 __tablename__ = "model_versions"
53 id = Column(String, primary_key=True)
54 name = Column(String, nullable=False)
55 version = Column(Integer, nullable=False)
56 stage = Column(Enum(ModelStage), default=ModelStage.DEVELOPMENT)
57 training_job_id = Column(String, nullable=False)
58 adapter_path = Column(String, nullable=False)
59 eval_metrics = Column(JSON)
60 promoted_by = Column(String)
61 promoted_at = Column(DateTime)
62 created_at = Column(DateTime, default=datetime.utcnow)
63 
64engine = create_async_engine(settings.database_url)
65async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
66 

API Schemas

python
1# app/models/schemas.py
2from pydantic import BaseModel, Field
3from typing import Optional
4from datetime import datetime
5 
6class DatasetUploadResponse(BaseModel):
7 id: str
8 name: str
9 status: str
10 num_examples: int
11 
12class TrainingJobCreate(BaseModel):
13 dataset_id: str
14 base_model: str = "meta-llama/Llama-3.1-8B-Instruct"
15 lora_r: int = Field(default=16, ge=4, le=128)
16 lora_alpha: int = Field(default=32, ge=8, le=256)
17 learning_rate: float = Field(default=2e-4, gt=0, lt=1)
18 num_epochs: int = Field(default=3, ge=1, le=20)
19 batch_size: int = Field(default=4, ge=1, le=32)
20 gradient_accumulation: int = Field(default=4, ge=1, le=32)
21 use_qlora: bool = False
22 max_seq_length: int = Field(default=2048, ge=256, le=8192)
23 
24class TrainingJobResponse(BaseModel):
25 id: str
26 dataset_id: str
27 status: str
28 config: dict
29 metrics: Optional[dict] = None
30 started_at: Optional[datetime] = None
31 completed_at: Optional[datetime] = None
32 error_message: Optional[str] = None
33 
34class EvalRequest(BaseModel):
35 training_job_id: str
36 eval_dataset_id: str
37 max_examples: int = Field(default=100, ge=10, le=1000)
38 
39class ModelPromoteRequest(BaseModel):
40 target_stage: str
41 approved_by: str
42 

Dataset Router

python
1# app/routers/datasets.py
2import uuid
3import json
4import hashlib
5from fastapi import APIRouter, UploadFile, File, HTTPException
6from app.models.database import async_session, TrainingDataset, DatasetStatus
7from app.models.schemas import DatasetUploadResponse
8 
9router = APIRouter(prefix="/api/datasets", tags=["datasets"])
10 
11@router.post("/upload", response_model=DatasetUploadResponse)
12async def upload_dataset(name: str, file: UploadFile = File(...)):
13 dataset_id = f"ds_{uuid.uuid4().hex[:12]}"
14 file_path = f"/data/datasets/{dataset_id}.jsonl"
15 
16 content = await file.read()
17 lines = content.decode().strip().split("\n")
18 
19 issues = []
20 valid_examples = []
21 seen_hashes = set()
22 
23 for i, line in enumerate(lines, 1):
24 try:
25 item = json.loads(line)
26 except json.JSONDecodeError:
27 issues.append(f"Line {i}: invalid JSON")
28 continue
29 
30 if "instruction" not in item or "output" not in item:
31 issues.append(f"Line {i}: missing required fields")
32 continue
33 
34 content_hash = hashlib.sha256(
35 f"{item['instruction']}|{item['output']}".encode()
36 ).hexdigest()
37 
38 if content_hash in seen_hashes:
39 issues.append(f"Line {i}: duplicate")
40 continue
41 seen_hashes.add(content_hash)
42 
43 valid_examples.append(line)
44 
45 with open(file_path, "w") as f:
46 f.write("\n".join(valid_examples))
47 
48 async with async_session() as session:
49 dataset = TrainingDataset(
50 id=dataset_id,
51 name=name,
52 file_path=file_path,
53 status=DatasetStatus.READY if not issues else DatasetStatus.ERROR,
54 num_examples=len(valid_examples),
55 validation_report={"issues": issues, "valid_count": len(valid_examples)},
56 )
57 session.add(dataset)
58 await session.commit()
59 
60 return DatasetUploadResponse(
61 id=dataset_id,
62 name=name,
63 status=dataset.status.value,
64 num_examples=len(valid_examples),
65 )
66 
67@router.get("/{dataset_id}")
68async def get_dataset(dataset_id: str):
69 async with async_session() as session:
70 dataset = await session.get(TrainingDataset, dataset_id)
71 if not dataset:
72 raise HTTPException(status_code=404, detail="Dataset not found")
73 return {
74 "id": dataset.id,
75 "name": dataset.name,
76 "status": dataset.status.value,
77 "num_examples": dataset.num_examples,
78 "validation_report": dataset.validation_report,
79 }
80 

Need a second opinion on your AI systems architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Training Router and Worker

python
1# app/routers/training.py
2import uuid
3from fastapi import APIRouter, HTTPException
4from app.models.database import async_session, TrainingJob, TrainingStatus, TrainingDataset
5from app.models.schemas import TrainingJobCreate, TrainingJobResponse
6from app.workers.training_worker import start_training_job
7 
8router = APIRouter(prefix="/api/training", tags=["training"])
9 
10@router.post("/jobs", response_model=TrainingJobResponse)
11async def create_training_job(request: TrainingJobCreate):
12 async with async_session() as session:
13 dataset = await session.get(TrainingDataset, request.dataset_id)
14 if not dataset:
15 raise HTTPException(status_code=404, detail="Dataset not found")
16 if dataset.status != "ready":
17 raise HTTPException(status_code=400, detail="Dataset not ready")
18 
19 job_id = f"job_{uuid.uuid4().hex[:12]}"
20 config = request.model_dump()
21 
22 async with async_session() as session:
23 job = TrainingJob(
24 id=job_id,
25 dataset_id=request.dataset_id,
26 status=TrainingStatus.QUEUED,
27 config=config,
28 )
29 session.add(job)
30 await session.commit()
31 
32 start_training_job.delay(job_id)
33 
34 return TrainingJobResponse(
35 id=job_id,
36 dataset_id=request.dataset_id,
37 status="queued",
38 config=config,
39 )
40 
41@router.get("/jobs/{job_id}", response_model=TrainingJobResponse)
42async def get_training_job(job_id: str):
43 async with async_session() as session:
44 job = await session.get(TrainingJob, job_id)
45 if not job:
46 raise HTTPException(status_code=404, detail="Job not found")
47 return TrainingJobResponse(
48 id=job.id,
49 dataset_id=job.dataset_id,
50 status=job.status.value,
51 config=job.config,
52 metrics=job.metrics,
53 started_at=job.started_at,
54 completed_at=job.completed_at,
55 error_message=job.error_message,
56 )
57 
58@router.post("/jobs/{job_id}/cancel")
59async def cancel_training_job(job_id: str):
60 async with async_session() as session:
61 job = await session.get(TrainingJob, job_id)
62 if not job:
63 raise HTTPException(status_code=404, detail="Job not found")
64 if job.status != TrainingStatus.RUNNING:
65 raise HTTPException(status_code=400, detail="Job not running")
66 job.status = TrainingStatus.CANCELLED
67 await session.commit()
68 return {"status": "cancelled"}
69 
python
1# app/workers/training_worker.py
2from celery import Celery
3from datetime import datetime
4import torch
5from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, BitsAndBytesConfig
6from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training, TaskType
7from trl import SFTTrainer
8from datasets import load_dataset
9from app.config import settings
10 
11celery_app = Celery("training", broker=settings.redis_url)
12 
13@celery_app.task(bind=True, max_retries=0)
14def start_training_job(self, job_id: str):
15 from app.models.database import sync_session, TrainingJob, TrainingStatus
16 
17 with sync_session() as session:
18 job = session.get(TrainingJob, job_id)
19 job.status = TrainingStatus.RUNNING
20 job.started_at = datetime.utcnow()
21 session.commit()
22 config = job.config
23 dataset_path = session.get(TrainingDataset, job.dataset_id).file_path
24 
25 try:
26 output_dir = f"{settings.model_storage_path}/{job_id}"
27 tokenizer = AutoTokenizer.from_pretrained(config["base_model"])
28 tokenizer.pad_token = tokenizer.eos_token
29 
30 if config.get("use_qlora"):
31 bnb_config = BitsAndBytesConfig(
32 load_in_4bit=True,
33 bnb_4bit_quant_type="nf4",
34 bnb_4bit_compute_dtype=torch.bfloat16,
35 )
36 model = AutoModelForCausalLM.from_pretrained(
37 config["base_model"],
38 quantization_config=bnb_config,
39 device_map="auto",
40 )
41 model = prepare_model_for_kbit_training(model)
42 else:
43 model = AutoModelForCausalLM.from_pretrained(
44 config["base_model"],
45 torch_dtype=torch.bfloat16,
46 device_map="auto",
47 )
48 
49 lora_config = LoraConfig(
50 task_type=TaskType.CAUSAL_LM,
51 r=config["lora_r"],
52 lora_alpha=config["lora_alpha"],
53 lora_dropout=0.05,
54 target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
55 )
56 model = get_peft_model(model, lora_config)
57 
58 dataset = load_dataset("json", data_files=dataset_path, split="train")
59 
60 training_args = TrainingArguments(
61 output_dir=output_dir,
62 num_train_epochs=config["num_epochs"],
63 per_device_train_batch_size=config["batch_size"],
64 gradient_accumulation_steps=config["gradient_accumulation"],
65 learning_rate=config["learning_rate"],
66 warmup_ratio=0.1,
67 lr_scheduler_type="cosine",
68 bf16=True,
69 gradient_checkpointing=True,
70 save_strategy="epoch",
71 logging_steps=10,
72 report_to="none",
73 )
74 
75 trainer = SFTTrainer(
76 model=model,
77 train_dataset=dataset,
78 args=training_args,
79 tokenizer=tokenizer,
80 max_seq_length=config.get("max_seq_length", 2048),
81 )
82 
83 trainer.train()
84 trainer.save_model(output_dir)
85 
86 final_metrics = {
87 "final_loss": trainer.state.log_history[-1].get("train_loss"),
88 "total_steps": trainer.state.global_step,
89 "epochs_completed": config["num_epochs"],
90 }
91 
92 with sync_session() as session:
93 job = session.get(TrainingJob, job_id)
94 job.status = TrainingStatus.COMPLETED
95 job.completed_at = datetime.utcnow()
96 job.output_path = output_dir
97 job.metrics = final_metrics
98 session.commit()
99 
100 except Exception as e:
101 with sync_session() as session:
102 job = session.get(TrainingJob, job_id)
103 job.status = TrainingStatus.FAILED
104 job.completed_at = datetime.utcnow()
105 job.error_message = str(e)
106 session.commit()
107 raise
108 

Main Application

python
1# app/main.py
2from fastapi import FastAPI
3from contextlib import asynccontextmanager
4from app.models.database import engine, Base
5from app.routers import datasets, training, evaluation, models
6 
7@asynccontextmanager
8async def lifespan(app: FastAPI):
9 async with engine.begin() as conn:
10 await conn.run_sync(Base.metadata.create_all)
11 yield
12 await engine.dispose()
13 
14app = FastAPI(
15 title="LLM Fine-Tuning Service",
16 version="1.0.0",
17 lifespan=lifespan,
18)
19 
20app.include_router(datasets.router)
21app.include_router(training.router)
22 
23@app.get("/health")
24async def health():
25 return {"status": "healthy"}
26 

Docker Compose for Development

yaml
1version: "3.8"
2services:
3 api:
4 build: .
5 ports:
6 - "8000:8000"
7 environment:
8 - DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/finetune
9 - REDIS_URL=redis://redis:6379
10 depends_on:
11 - db
12 - redis
13 deploy:
14 resources:
15 reservations:
16 devices:
17 - driver: nvidia
18 count: 1
19 capabilities: [gpu]
20 
21 worker:
22 build: .
23 command: celery -A app.workers.training_worker worker --loglevel=info --concurrency=1
24 environment:
25 - DATABASE_URL=postgresql://user:pass@db:5432/finetune
26 - REDIS_URL=redis://redis:6379
27 depends_on:
28 - db
29 - redis
30 deploy:
31 resources:
32 reservations:
33 devices:
34 - driver: nvidia
35 count: 1
36 capabilities: [gpu]
37 
38 db:
39 image: postgres:16-alpine
40 environment:
41 POSTGRES_USER: user
42 POSTGRES_PASSWORD: pass
43 POSTGRES_DB: finetune
44 volumes:
45 - pgdata:/var/lib/postgresql/data
46 
47 redis:
48 image: redis:7-alpine
49 
50volumes:
51 pgdata:
52 

Conclusion

This FastAPI-based fine-tuning service provides a production-ready foundation that separates concerns cleanly: the API handles data management and job orchestration, Celery workers execute GPU-intensive training, and the model registry manages the promotion pipeline. The architecture scales horizontally by adding more Celery workers, each with their own GPU.

The key design decision is using Celery for training job execution rather than running training in the API process. This prevents a long-running training job from blocking the API and allows independent scaling of the API tier (which needs CPU and memory) from the training tier (which needs GPUs).

FAQ

Need expert help?

Building with agentic AI?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026