10. Production RAG - Everything Together
Nine articles, nine patterns. Now build the service that ships: Pydantic config, request-ID middleware, async document ingestion, a unified multi-mode query endpoint, K8s health probes, and a multi-stage Dockerfile.
Production RAG
Everything Together
Nine articles, nine patterns. Now we build the service that ships: Pydantic config, request-ID middleware, async document ingestion, a unified multi-mode query endpoint, K8s health probes, and a multi-stage Dockerfile. One codebase, three RAG strategies, zero duct tape.
🏭What "Production" Actually Means
Every article in this series built something that works. This one builds something that ships. The difference is not about adding more features - it's about three things:
Observable
Every request has a correlation ID. Every log line carries it. You can trace a bug from a user complaint to a specific LLM call in seconds.
Configurable
No hardcoded hosts, ports, or model names. Every tunable comes from env vars via Pydantic Settings. Change a model without touching code.
Deployable
Multi-stage Dockerfile, non-root user, HEALTHCHECK instruction. Kubernetes readiness probe that actually checks the dependency, not just the process.
The RAG algorithms from articles 3-9 are unchanged. The production layer wraps them - it doesn't replace them. That separation is intentional: business logic stays clean, infrastructure concerns stay at the edges.
📁Project Layout
Each file has a single, obvious responsibility. rag_modes.py is where the article 3-9 patterns live together - one function per strategy, a shared QueryResult dataclass as their common return type. The router calls whichever strategy the mode= parameter selects.
⚙️Centralised Config
In every previous article, configuration was scattered: _CHROMA_HOST = "localhost" at the top of each file, _DEFAULT_MODEL = "llama3.2:3b" repeated in every router. That works for a demo. It fails in production, where you need to change the Ollama host without grep-and-replace across ten files.
from pydantic_settings import BaseSettings class Settings(BaseSettings): # ChromaDB chroma_host: str = "localhost" chroma_port: int = 8001 collection_prefix: str = "tenant_" # Ollama default_model: str = "llama3.2:3b" # API api_title: str = "Production RAG API" api_version: str = "1.0.0" log_level: str = "INFO" model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} settings = Settings()
Every other module does from config import settings and reads settings.chroma_host. No module ever calls os.getenv() directly. The result: changing the ChromaDB host for a staging environment is a one-line .env change, not a code change.
Environment hierarchy. Pydantic Settings reads from three sources, in priority order: environment variables → .env file → class defaults. In Docker/K8s, set CHROMA_HOST as an env var. Locally, use a .env file. The class defaults handle everything if neither is set.
🔖Request-ID Middleware
Without correlation IDs, debugging a production issue looks like this: "The user reported a wrong answer at 14:32." You search your logs, find fifty concurrent requests in that second, and have no way to know which one caused the problem.
With correlation IDs, it looks like this: "The client received header X-Request-ID: a3f7b2c1. Grep for it, get every log line from that single request."
class RequestIDMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): request_id = uuid.uuid4().hex[:8] # 8 hex chars - short but collision-safe request.state.request_id = request_id start = time.perf_counter() response = await call_next(request) ms = round((time.perf_counter() - start) * 1000) log.info( "%s %s → %d [%dms] rid=%s", request.method, request.url.path, response.status_code, ms, request_id, ) response.headers["X-Request-ID"] = request_id return response
The middleware does three things: generates a short ID, stamps it on request.state so routers can access it, and logs the completed request with method, path, status, and duration. The ID is echoed in the response header - clients should log it alongside any error they surface to users.
2025-05-22T14:32:07 INFO middleware - POST /query/ → 200 [1247ms] rid=a3f7b2c1 2025-05-22T14:32:07 INFO rag_modes - query mode=crag tenant=acme model=llama3.2:3b k=5 2025-05-22T14:32:07 INFO rag_modes - Strategy=PARTIAL grades=[CORRECT(0.91),AMBIGUOUS(0.54),INCORRECT(0.12)]
📥Async Document Ingestion
Ingestion is the operation most likely to be slow: fetching documents, chunking, embedding, upserting to ChromaDB. If it blocks the HTTP request, the client times out. The solution is to accept the request immediately (202 Accepted), run the work as an asyncio Task, and let the client poll for completion.
class JobStatus(str, Enum): PENDING = "pending" RUNNING = "running" DONE = "done" FAILED = "failed" @dataclass class IngestJob: job_id: str tenant_id: str total: int status: JobStatus = JobStatus.PENDING chunks_ok: int = 0 error: str = "" def submit(tenant_id: str, texts: list[str]) -> IngestJob: job = IngestJob( job_id = uuid.uuid4().hex[:12], tenant_id = tenant_id, total = len(texts), ) _jobs[job.job_id] = job asyncio.create_task(_run(job, texts)) # fire-and-forget return job # caller gets the ID immediately
The actual indexing runs in _run(), which calls asyncio.to_thread() for the ChromaDB upserts - keeping the event loop unblocked while the synchronous ChromaDB client does its work.
async def _run(job: IngestJob, texts: list[str]) -> None: job.status = JobStatus.RUNNING try: client = _client() collection = await asyncio.to_thread( client.get_or_create_collection, f"{settings.collection_prefix}{job.tenant_id}", ) for i, text in enumerate(texts): await asyncio.to_thread( collection.upsert, ids=[f"{job.job_id}_{i}"], documents=[text], ) job.chunks_ok += 1 job.status = JobStatus.DONE except Exception as exc: job.status = JobStatus.FAILED job.error = str(exc)
The in-memory job store is demo-grade. _jobs: dict[str, IngestJob] = {} doesn't survive a restart, can't be seen by multiple replicas, and grows forever. For production: use Redis as the job store, and replace asyncio.create_task() with a proper task queue - Celery, ARQ, or Prefect. The interface (submit → job_id → poll) is production-grade; only the storage backend changes.
From the client's perspective, the flow is clean:
# 1. Submit - returns immediately with job_id curl -s -X POST http://localhost:8000/ingest/ \ -H "Content-Type: application/json" \ -d '{"tenant_id":"acme","texts":["chunk one","chunk two","chunk three"]}' \ | python3 -m json.tool # → {"job_id":"a1b2c3d4e5f6","status":"pending","total":3,"chunks_ok":0,...} # 2. Poll until done curl -s http://localhost:8000/ingest/a1b2c3d4e5f6 | python3 -m json.tool # → {"job_id":"a1b2c3d4e5f6","status":"done","total":3,"chunks_ok":3,...}
🔀Unified Query Endpoint
Articles 3, 8, and 9 each had their own router with their own endpoint. In a production service you want one stable URL - POST /query/ - and a runtime switch for the strategy. The mode= query parameter does this without branching in every client that calls the API.
The strategy interface
rag_modes.py defines a shared QueryResult dataclass that all three strategies return. The router doesn't know or care which strategy it invoked - it maps QueryResult to QueryResponse identically for all modes.
@dataclass class QueryResult: mode: str answer: str strategy: str | None = None # CRAG strategy label rephrased: str | None = None # CRAG rephrased query citations: list[Citation] = field(default_factory=list) confidence: float | None = None # structured confidence score cannot_answer: bool | None = None # structured cannot_answer flag reasoning: str | None = None # structured reasoning trace
The dispatch table
The router maps mode strings to callables. No if/elif chains - a dict does the job and makes adding a new mode a one-liner:
@router.post("/", response_model=QueryResponse) async def unified_query( q: Annotated[str, Query(description="Question to answer")], mode: Annotated[Literal["standard", "crag", "structured"], Query(description="RAG strategy to use")] = "standard", # ... tenant_id, model, k ... ) -> QueryResponse: collection = _get_collection(tenant_id) dispatch = { "standard": rm.mode_standard, "crag": rm.mode_crag, "structured": rm.mode_structured, } result = dispatch[mode](question=q, collection=collection, model=model, k=k) return QueryResponse(...) # maps QueryResult fields to response schema
The three modes share the same retrieval layer (_vector_search) and differ only in what they do with the chunks before and after generation:
| mode | Retrieval | Before generation | Generation | Extra fields |
|---|---|---|---|---|
standard |
Top-k vector search | Nothing - pass all chunks | Free text | - |
crag |
Top-k vector search | Grade each chunk; discard INCORRECT; rephrase if all fail | Free text from curated chunks | strategy, rephrased |
structured |
Top-k vector search | Inject Pydantic schema into system prompt | JSON (format="json") | citations, confidence, cannot_answer, reasoning |
Combining modes. The most powerful production pattern is not choosing one mode - it's chaining them. Run CRAG to filter low-quality chunks, then pass the curated context to the structured mode. The result: clean citations from only verified-relevant sources, with an explicit confidence score.
❤️Health & Readiness Probes
Kubernetes distinguishes two probe types, and they answer different questions:
GET /health (liveness)
- Question: "Is the process alive?"
- Should always return 200 if the app started
- Never check dependencies - just return OK
- Failure → K8s restarts the pod
GET /ready (readiness)
- Question: "Can this pod serve traffic?"
- Checks real dependencies (ChromaDB heartbeat)
- Returns 503 if any dependency is down
- Failure → K8s stops routing traffic to the pod
@router.get("/health", response_model=HealthOut) async def health() -> HealthOut: return HealthOut(status="ok", version=settings.api_version) @router.get("/ready", response_model=ReadyOut) async def ready(response: Response) -> ReadyOut: try: client = chromadb.HttpClient(host=settings.chroma_host, port=settings.chroma_port) client.heartbeat() chroma = "ok" except Exception as exc: chroma = f"unavailable: {exc}" ok = chroma == "ok" if not ok: response.status_code = 503 # K8s sees this and stops routing return ReadyOut(status="ok" if ok else "degraded", chroma=chroma)
The K8s manifest for these probes looks like this:
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 15
failureThreshold: 3
🐳Dockerfile
The Dockerfile follows the standard multi-stage pattern from the global coding rules: builder stage installs dependencies into a virtualenv, runtime stage copies only the venv and source - no build tools, no pip cache, smaller image.
# ── builder ──────────────────────────────────────────────────────────────── FROM python:3.12-slim-bookworm AS builder WORKDIR /app RUN pip install --no-cache-dir poetry==1.8.3 \ && poetry config virtualenvs.in-project true COPY pyproject.toml poetry.lock ./ RUN poetry install --only main --no-root COPY . . # ── runtime ──────────────────────────────────────────────────────────────── FROM python:3.12-slim-bookworm WORKDIR /app RUN useradd -r -s /bin/false appuser # non-root user COPY --from=builder /app/.venv /app/.venv COPY --from=builder /app /app ENV PATH="/app/.venv/bin:$PATH" USER appuser HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ CMD python -c \ "import urllib.request; urllib.request.urlopen('http://localhost:8000/health', timeout=4)" EXPOSE 8000 ENTRYPOINT ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Why HEALTHCHECK in the Dockerfile? Docker's HEALTHCHECK drives docker ps status and Docker Swarm's health-based restart policy. Kubernetes ignores it (it uses its own probes), but the /health endpoint serves both. One endpoint, two orchestration layers.
Wiring everything up in main.py
import logging from fastapi import FastAPI from config import settings from middleware import RequestIDMiddleware from routers import health, ingest, query logging.basicConfig( level = getattr(logging, settings.log_level.upper(), logging.INFO), format = "%(asctime)s %(levelname)-8s %(name)s - %(message)s", datefmt = "%Y-%m-%dT%H:%M:%S", ) app = FastAPI( title = settings.api_title, version = settings.api_version, description = "Multi-tenant RAG - standard · corrective · structured output modes.", ) app.add_middleware(RequestIDMiddleware) app.include_router(health.router) app.include_router(ingest.router) app.include_router(query.router)
Seven lines after the imports. That's what the production layer looks like when every concern is in the right file: config in config.py, logging setup at the top of main.py, middleware registration in order, routers registered. The app object knows nothing about RAG - it only knows that it has three routers.
📚Series Recap - What You Built
Ten articles, ten production-grade patterns. Every project used only open-source tools: Ollama for local LLMs, ChromaDB for vector storage, FastAPI for the API layer, Pydantic for validation. No API keys, no subscriptions, no vendor lock-in.
From Prototype to Production
The difference between a RAG demo and a RAG service is not the algorithm - it's the layer around it. Config that doesn't require code changes. Logs you can grep. A health endpoint that actually checks your dependencies. A container that runs as a non-root user.
Every pattern in this series - CRAG, structured outputs, agentic tool-calling, graph traversal - runs the same in a container on your laptop as it does behind a load balancer in production. That's the point. Open-source models locally means you own the full stack.
The series is complete. The code is yours.