10. Production RAG - Everything Together

Nine articles, nine patterns. Now build the service that ships: Pydantic config, request-ID middleware, async document ingestion, a unified multi-mode query endpoint, K8s health probes, and a multi-stage Dockerfile.

10. Production RAG - Everything Together
10. Production RAG - Everything Together
Series · Article 10 of 10 · Final

Production RAG
Everything Together

Nine articles, nine patterns. Now we build the service that ships: Pydantic config, request-ID middleware, async document ingestion, a unified multi-mode query endpoint, K8s health probes, and a multi-stage Dockerfile. One codebase, three RAG strategies, zero duct tape.

⏱ ~40 min build 🔧 pydantic-settings · fastapi · uvicorn · docker 📦 Series finale

🏭What "Production" Actually Means

Every article in this series built something that works. This one builds something that ships. The difference is not about adding more features - it's about three things:

👁️

Observable

Every request has a correlation ID. Every log line carries it. You can trace a bug from a user complaint to a specific LLM call in seconds.

⚙️

Configurable

No hardcoded hosts, ports, or model names. Every tunable comes from env vars via Pydantic Settings. Change a model without touching code.

🚀

Deployable

Multi-stage Dockerfile, non-root user, HEALTHCHECK instruction. Kubernetes readiness probe that actually checks the dependency, not just the process.

The RAG algorithms from articles 3-9 are unchanged. The production layer wraps them - it doesn't replace them. That separation is intentional: business logic stays clean, infrastructure concerns stay at the edges.

📁Project Layout

article-10/ ├── config.py # Pydantic Settings - all env vars in one place ├── middleware.py # Request-ID injection + timing ├── ingestion.py # Async ingestion with job tracking ├── rag_modes.py # standard / crag / structured - three strategies, one interface ├── main.py # FastAPI app, middleware, router registration ├── routers/ │ ├── health.py # GET /health, GET /ready │ ├── ingest.py # POST /ingest/, GET /ingest/{job_id} │ └── query.py # POST /query/?mode=standard|crag|structured ├── Dockerfile # Multi-stage, non-root, HEALTHCHECK └── requirements.txt # Full series dependency list

Each file has a single, obvious responsibility. rag_modes.py is where the article 3-9 patterns live together - one function per strategy, a shared QueryResult dataclass as their common return type. The router calls whichever strategy the mode= parameter selects.

⚙️Centralised Config

In every previous article, configuration was scattered: _CHROMA_HOST = "localhost" at the top of each file, _DEFAULT_MODEL = "llama3.2:3b" repeated in every router. That works for a demo. It fails in production, where you need to change the Ollama host without grep-and-replace across ten files.

python config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    # ChromaDB
    chroma_host:       str = "localhost"
    chroma_port:       int = 8001
    collection_prefix: str = "tenant_"

    # Ollama
    default_model:     str = "llama3.2:3b"

    # API
    api_title:         str = "Production RAG API"
    api_version:       str = "1.0.0"
    log_level:         str = "INFO"

    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}


settings = Settings()

Every other module does from config import settings and reads settings.chroma_host. No module ever calls os.getenv() directly. The result: changing the ChromaDB host for a staging environment is a one-line .env change, not a code change.

💡

Environment hierarchy. Pydantic Settings reads from three sources, in priority order: environment variables → .env file → class defaults. In Docker/K8s, set CHROMA_HOST as an env var. Locally, use a .env file. The class defaults handle everything if neither is set.

🔖Request-ID Middleware

Without correlation IDs, debugging a production issue looks like this: "The user reported a wrong answer at 14:32." You search your logs, find fifty concurrent requests in that second, and have no way to know which one caused the problem.

With correlation IDs, it looks like this: "The client received header X-Request-ID: a3f7b2c1. Grep for it, get every log line from that single request."

python middleware.py
class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = uuid.uuid4().hex[:8]          # 8 hex chars - short but collision-safe
        request.state.request_id = request_id

        start    = time.perf_counter()
        response = await call_next(request)
        ms       = round((time.perf_counter() - start) * 1000)

        log.info(
            "%s %s → %d  [%dms] rid=%s",
            request.method, request.url.path,
            response.status_code, ms, request_id,
        )
        response.headers["X-Request-ID"] = request_id
        return response

The middleware does three things: generates a short ID, stamps it on request.state so routers can access it, and logs the completed request with method, path, status, and duration. The ID is echoed in the response header - clients should log it alongside any error they surface to users.

text example log output
2025-05-22T14:32:07 INFO     middleware - POST /query/ → 200  [1247ms] rid=a3f7b2c1
2025-05-22T14:32:07 INFO     rag_modes  - query mode=crag tenant=acme model=llama3.2:3b k=5
2025-05-22T14:32:07 INFO     rag_modes  - Strategy=PARTIAL grades=[CORRECT(0.91),AMBIGUOUS(0.54),INCORRECT(0.12)]

📥Async Document Ingestion

Ingestion is the operation most likely to be slow: fetching documents, chunking, embedding, upserting to ChromaDB. If it blocks the HTTP request, the client times out. The solution is to accept the request immediately (202 Accepted), run the work as an asyncio Task, and let the client poll for completion.

python ingestion.py
class JobStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    DONE    = "done"
    FAILED  = "failed"


@dataclass
class IngestJob:
    job_id:    str
    tenant_id: str
    total:     int
    status:    JobStatus = JobStatus.PENDING
    chunks_ok: int = 0
    error:     str = ""


def submit(tenant_id: str, texts: list[str]) -> IngestJob:
    job = IngestJob(
        job_id    = uuid.uuid4().hex[:12],
        tenant_id = tenant_id,
        total     = len(texts),
    )
    _jobs[job.job_id] = job
    asyncio.create_task(_run(job, texts))   # fire-and-forget
    return job                               # caller gets the ID immediately

The actual indexing runs in _run(), which calls asyncio.to_thread() for the ChromaDB upserts - keeping the event loop unblocked while the synchronous ChromaDB client does its work.

python ingestion.py - _run()
async def _run(job: IngestJob, texts: list[str]) -> None:
    job.status = JobStatus.RUNNING
    try:
        client     = _client()
        collection = await asyncio.to_thread(
            client.get_or_create_collection,
            f"{settings.collection_prefix}{job.tenant_id}",
        )
        for i, text in enumerate(texts):
            await asyncio.to_thread(
                collection.upsert,
                ids=[f"{job.job_id}_{i}"],
                documents=[text],
            )
            job.chunks_ok += 1
        job.status = JobStatus.DONE
    except Exception as exc:
        job.status = JobStatus.FAILED
        job.error  = str(exc)
⚠️

The in-memory job store is demo-grade. _jobs: dict[str, IngestJob] = {} doesn't survive a restart, can't be seen by multiple replicas, and grows forever. For production: use Redis as the job store, and replace asyncio.create_task() with a proper task queue - Celery, ARQ, or Prefect. The interface (submit → job_id → poll) is production-grade; only the storage backend changes.

From the client's perspective, the flow is clean:

bash client example
# 1. Submit - returns immediately with job_id
curl -s -X POST http://localhost:8000/ingest/ \
  -H "Content-Type: application/json" \
  -d '{"tenant_id":"acme","texts":["chunk one","chunk two","chunk three"]}' \
  | python3 -m json.tool
# → {"job_id":"a1b2c3d4e5f6","status":"pending","total":3,"chunks_ok":0,...}

# 2. Poll until done
curl -s http://localhost:8000/ingest/a1b2c3d4e5f6 | python3 -m json.tool
# → {"job_id":"a1b2c3d4e5f6","status":"done","total":3,"chunks_ok":3,...}

🔀Unified Query Endpoint

Articles 3, 8, and 9 each had their own router with their own endpoint. In a production service you want one stable URL - POST /query/ - and a runtime switch for the strategy. The mode= query parameter does this without branching in every client that calls the API.

The strategy interface

rag_modes.py defines a shared QueryResult dataclass that all three strategies return. The router doesn't know or care which strategy it invoked - it maps QueryResult to QueryResponse identically for all modes.

python rag_modes.py - shared interface
@dataclass
class QueryResult:
    mode:          str
    answer:        str
    strategy:      str | None = None       # CRAG strategy label
    rephrased:     str | None = None       # CRAG rephrased query
    citations:     list[Citation] = field(default_factory=list)
    confidence:    float | None   = None   # structured confidence score
    cannot_answer: bool  | None   = None   # structured cannot_answer flag
    reasoning:     str   | None   = None   # structured reasoning trace

The dispatch table

The router maps mode strings to callables. No if/elif chains - a dict does the job and makes adding a new mode a one-liner:

python routers/query.py
@router.post("/", response_model=QueryResponse)
async def unified_query(
    q:    Annotated[str, Query(description="Question to answer")],
    mode: Annotated[Literal["standard", "crag", "structured"],
           Query(description="RAG strategy to use")] = "standard",
    # ... tenant_id, model, k ...
) -> QueryResponse:
    collection = _get_collection(tenant_id)

    dispatch = {
        "standard":   rm.mode_standard,
        "crag":       rm.mode_crag,
        "structured": rm.mode_structured,
    }
    result = dispatch[mode](question=q, collection=collection, model=model, k=k)
    return QueryResponse(...)  # maps QueryResult fields to response schema

The three modes share the same retrieval layer (_vector_search) and differ only in what they do with the chunks before and after generation:

modeRetrievalBefore generationGenerationExtra fields
standard Top-k vector search Nothing - pass all chunks Free text -
crag Top-k vector search Grade each chunk; discard INCORRECT; rephrase if all fail Free text from curated chunks strategy, rephrased
structured Top-k vector search Inject Pydantic schema into system prompt JSON (format="json") citations, confidence, cannot_answer, reasoning
ℹ️

Combining modes. The most powerful production pattern is not choosing one mode - it's chaining them. Run CRAG to filter low-quality chunks, then pass the curated context to the structured mode. The result: clean citations from only verified-relevant sources, with an explicit confidence score.

❤️Health & Readiness Probes

Kubernetes distinguishes two probe types, and they answer different questions:

GET /health (liveness)

  • Question: "Is the process alive?"
  • Should always return 200 if the app started
  • Never check dependencies - just return OK
  • Failure → K8s restarts the pod

GET /ready (readiness)

  • Question: "Can this pod serve traffic?"
  • Checks real dependencies (ChromaDB heartbeat)
  • Returns 503 if any dependency is down
  • Failure → K8s stops routing traffic to the pod
python routers/health.py
@router.get("/health", response_model=HealthOut)
async def health() -> HealthOut:
    return HealthOut(status="ok", version=settings.api_version)


@router.get("/ready", response_model=ReadyOut)
async def ready(response: Response) -> ReadyOut:
    try:
        client = chromadb.HttpClient(host=settings.chroma_host, port=settings.chroma_port)
        client.heartbeat()
        chroma = "ok"
    except Exception as exc:
        chroma = f"unavailable: {exc}"

    ok = chroma == "ok"
    if not ok:
        response.status_code = 503          # K8s sees this and stops routing
    return ReadyOut(status="ok" if ok else "degraded", chroma=chroma)

The K8s manifest for these probes looks like this:

yaml k8s Deployment snippet
livenessProbe:
  httpGet:
    path: /health
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10

readinessProbe:
  httpGet:
    path: /ready
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 15
  failureThreshold: 3

🐳Dockerfile

The Dockerfile follows the standard multi-stage pattern from the global coding rules: builder stage installs dependencies into a virtualenv, runtime stage copies only the venv and source - no build tools, no pip cache, smaller image.

dockerfile Dockerfile
# ── builder ────────────────────────────────────────────────────────────────
FROM python:3.12-slim-bookworm AS builder

WORKDIR /app
RUN pip install --no-cache-dir poetry==1.8.3 \
    && poetry config virtualenvs.in-project true

COPY pyproject.toml poetry.lock ./
RUN poetry install --only main --no-root

COPY . .

# ── runtime ────────────────────────────────────────────────────────────────
FROM python:3.12-slim-bookworm

WORKDIR /app
RUN useradd -r -s /bin/false appuser             # non-root user

COPY --from=builder /app/.venv /app/.venv
COPY --from=builder /app /app

ENV PATH="/app/.venv/bin:$PATH"
USER appuser

HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
    CMD python -c \
    "import urllib.request; urllib.request.urlopen('http://localhost:8000/health', timeout=4)"

EXPOSE 8000
ENTRYPOINT ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
💡

Why HEALTHCHECK in the Dockerfile? Docker's HEALTHCHECK drives docker ps status and Docker Swarm's health-based restart policy. Kubernetes ignores it (it uses its own probes), but the /health endpoint serves both. One endpoint, two orchestration layers.

Wiring everything up in main.py

python main.py
import logging
from fastapi import FastAPI
from config import settings
from middleware import RequestIDMiddleware
from routers import health, ingest, query

logging.basicConfig(
    level   = getattr(logging, settings.log_level.upper(), logging.INFO),
    format  = "%(asctime)s %(levelname)-8s %(name)s - %(message)s",
    datefmt = "%Y-%m-%dT%H:%M:%S",
)

app = FastAPI(
    title       = settings.api_title,
    version     = settings.api_version,
    description = "Multi-tenant RAG - standard · corrective · structured output modes.",
)

app.add_middleware(RequestIDMiddleware)
app.include_router(health.router)
app.include_router(ingest.router)
app.include_router(query.router)

Seven lines after the imports. That's what the production layer looks like when every concern is in the right file: config in config.py, logging setup at the top of main.py, middleware registration in order, routers registered. The app object knows nothing about RAG - it only knows that it has three routers.

📚Series Recap - What You Built

Ten articles, ten production-grade patterns. Every project used only open-source tools: Ollama for local LLMs, ChromaDB for vector storage, FastAPI for the API layer, Pydantic for validation. No API keys, no subscriptions, no vendor lock-in.

01
Document Analysis with LLMs PDF ingestion, chunking strategies, Ollama for summarisation and entity extraction
02
RAG from Scratch Embeddings, FAISS vector index, semantic search, first end-to-end retrieve + generate pipeline
03
Multi-Tenant Document Q&A API ChromaDB collections per tenant, JWT auth, FastAPI router pattern - the base for all future articles
04
Streaming RAG Chat FastAPI SSE, token-by-token streaming, React frontend with AbortController
05
Evaluating RAG with RAGAS Faithfulness, AnswerRelevancy, ContextPrecision, ContextRecall - automated quality regression tests
06
GraphRAG NetworkX knowledge graph, BFS multi-hop traversal, hybrid vector + graph retrieval
07
Agentic RAG (ReAct) Ollama tool-calling, Reason→Act→Observe loop, multi-step retrieval with AgentStep trace
08
Corrective RAG (CRAG) LLM-based chunk grading, three-strategy selection, query rephrasing on total failure
09
Structured Outputs Pydantic schema injection, Ollama format=json, citations with excerpts, cannot_answer flag
10
Production RAG Pydantic Settings, request-ID middleware, async ingestion, unified mode selector, health probes, Dockerfile

From Prototype to Production

The difference between a RAG demo and a RAG service is not the algorithm - it's the layer around it. Config that doesn't require code changes. Logs you can grep. A health endpoint that actually checks your dependencies. A container that runs as a non-root user.

Every pattern in this series - CRAG, structured outputs, agentic tool-calling, graph traversal - runs the same in a container on your laptop as it does behind a load balancer in production. That's the point. Open-source models locally means you own the full stack.

The series is complete. The code is yours.