4. Streaming RAG Chat — FastAPI SSE + React

Add token-by-token streaming to your RAG pipeline. Server-Sent Events from FastAPI, a React client with AbortController, and the nuances of streaming structured retrieval responses.

4. Streaming RAG Chat — FastAPI SSE + React
4. Streaming RAG Chat — FastAPI SSE + React
Series · Article 4 of 10

Streaming RAG Chat
FastAPI SSE + React

Add token-by-token streaming to the Article 3 backend, then build a minimal React chat UI that renders each word as it arrives — no paid APIs, no UI frameworks.

⏱ ~45 min build 🔧 slowapi · ollama stream · Vite · React 18 📦 Builds on Article 3

🏗What You'll Build

Article 3 built a fully working RAG API — upload documents, ask questions, get answers. One problem: the user stares at a blank screen for 2–8 seconds waiting for Ollama to finish generating. Streaming fixes that by sending each token the moment it's produced, so the answer appears word-by-word like a typewriter.

This article adds two things to the Article 3 codebase:

Backend addition

  • New GET /query/stream/ endpoint that returns a Server-Sent Events stream
  • Per-IP rate limiting (30 req/min) via slowapi
  • Nginx buffering disabled so tokens aren't held back

Frontend (new)

  • Vite + React 18 + TypeScript — no UI framework
  • Login / register form
  • Chat window with live token rendering and a blinking cursor
  • Stop button via AbortController

The full data path from keypress to rendered token:

Browser
fetch() with Authorization header + ReadableStream
FastAPI
JWT auth → ChromaDB search → StreamingResponse
Ollama
ollama.chat(stream=True) yields one token at a time
React state
tokens accumulate in useState → re-render each token

📡How SSE Works

Server-Sent Events (SSE) is a one-directional HTTP stream from server to client. The response never closes — the server keeps writing newline-delimited text, and the client reads it incrementally. Each event is a line prefixed with data: followed by a JSON payload, terminated by two newlines:

SSE wire format
data: {"type":"sources","sources":[{"doc_id":"abc","text":"..."}]}

data: {"type":"token","text":"Paris"}

data: {"type":"token","text":" is"}

data: {"type":"token","text":" the"}

data: {"type":"done","model":"llama3.2:3b"}

EventSource vs fetch()

Browsers have a built-in EventSource API for SSE, but it has one critical limitation: it cannot send custom headers. That rules it out for JWT authentication. Instead we use fetch() with the Authorization header and read the response body as a ReadableStream.

EventSource (browser built-in)

  • Simple API — automatic reconnect on disconnect
  • ❌ GET only, no custom headers
  • ❌ Cannot send Authorization: Bearer
  • Good for public streams (stock tickers, news feeds)

fetch() + ReadableStream (our choice)

  • Full control over headers — JWT works
  • Requires manual SSE line parsing
  • Manual reconnect if needed
  • ✅ Required for any authenticated SSE
💡

A single reader.read() call may return multiple SSE lines, or a partial line that continues in the next read. The parser must buffer across reads and split on \n — not assume one event per chunk. This is the most common mistake when implementing SSE clients from scratch.

🧰Technology Stack

STREAMING 🔄

ollama.chat(stream=True)

Ollama's Python SDK supports streaming iteration: for chunk in ollama.chat(..., stream=True) yields one token dict per iteration. Zero extra dependencies — same ollama==0.4.4 from Article 3.

SSE TRANSPORT 📤

FastAPI StreamingResponse

Wraps an async generator with media_type="text/event-stream". The X-Accel-Buffering: no header tells nginx not to hold back chunks — without it, tokens batch up and arrive all at once.

RATE LIMITING 🚦

slowapi 0.1.9

Flask-Limiter port for FastAPI. Decorator-based: @limiter.limit("30/minute"). Uses the limits library for storage backends — in-memory by default, Redis for multi-process.

FRONTEND ⚛️

React 18 + Vite 6

React 18 for the UI, Vite for instant HMR dev server. Vite's proxy config forwards /query, /auth, /documents to FastAPI — no CORS issues in development.

STREAMING CLIENT 🌊

fetch() + ReadableStream

Native browser API — no libraries. response.body.getReader() + TextDecoder + manual SSE line parser. Works in every modern browser. The async generator pattern makes it composable.

ABORT ⏹️

AbortController

Cancels in-flight streams when the user clicks Stop. Pass signal to fetch() and the generator catches AbortError — Ollama stops generating the moment the connection drops.

📁Project Setup

Article 4 extends Article 3's codebase. Copy the article-03/ directory, or start fresh and copy the source files. The only shared files that change are main.py and requirements.txt — all other Article 3 files (auth.py, database.py, vectorstore.py, etc.) are unchanged.

shellproject layout
article-04/
├── main.py                    # updated — adds rate limiter + new router
├── rate_limit.py              # new — slowapi Limiter singleton
├── requirements.txt           # updated — adds slowapi + limits
├── routers/
│   ├── auth.py                # unchanged from Article 3
│   ├── documents.py           # unchanged from Article 3
│   ├── query.py               # unchanged from Article 3
│   └── query_stream.py        # new — SSE streaming endpoint
├── config.py, database.py, auth.py, embedder.py,
│   vectorstore.py, ingestor.py, llm.py, schemas.py
│                              # all unchanged from Article 3
└── frontend/                  # new — Vite + React app
    ├── package.json
    ├── vite.config.ts
    ├── index.html
    └── src/
        ├── main.tsx
        ├── App.tsx
        ├── LoginForm.tsx
        ├── Chat.tsx
        ├── api.ts
        ├── types.ts
        └── index.css

Install the two new Python dependencies:

shell
pip install slowapi==0.1.9 limits==3.7.0

🚦Backend: Rate Limiter

The streaming endpoint is expensive — each request ties up an Ollama process for several seconds. Without rate limiting, a single client can send dozens of concurrent requests and lock the server. slowapi adds per-IP limits with a single decorator.

rate_limit.py — the singleton

pythonrate_limit.py
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

The key_func determines who gets rate limited. get_remote_address uses the client IP. For per-user limits, swap it for a function that extracts sub from the JWT — but that requires decoding the token before the dependency injection layer runs, so IP-based limiting is the practical choice for this stack.

main.py — wire up the limiter

Two lines connect slowapi to FastAPI: attach the limiter to app state, and register the RateLimitExceeded handler that returns a proper HTTP 429 response.

pythonmain.py (additions)
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from rate_limit import limiter
from routers import auth, documents, query, query_stream

app = FastAPI(...)

# Rate-limit state — must be set before any route is registered
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

app.include_router(auth.router)
app.include_router(documents.router)
app.include_router(query.router)
app.include_router(query_stream.router)   # new
⚠️

slowapi requires the Request object as the first parameter of any route decorated with @limiter.limit(). If you omit it, you get a cryptic TypeError at startup. The request: Request parameter does not need to be used in the function body — slowapi reads it internally.

Backend: Streaming Endpoint

GET /query/stream/
Returns a Server-Sent Events stream. Query params: question (required), k (default 4), doc_id (optional). Requires Authorization: Bearer <token> header. Rate-limited to 30 req/min per IP.

SSE event contract

typeWhen sentPayload fieldsAction in UI
sourcesBefore first tokensources: Source[]Show retrieved passages
tokenEach LLM tokentext: stringAppend to live answer
doneGeneration completemodel: stringCommit bubble to history
errorAny failuremessage: stringShow error message

Sources come before the first token so the UI can show "retrieved 3 passages" while the LLM is still generating. This makes the response feel faster — the user sees progress immediately instead of waiting for the first word.

The async generator

pythonrouters/query_stream.py
def _event(payload: dict) -> str:
    return f"data: {json.dumps(payload)}\n\n"


async def _generate(question: str, chunks: list[dict]) -> AsyncGenerator[str, None]:
    settings = get_settings()

    # 1. Emit source passages before the first token
    sources = [
        {"doc_id": c["doc_id"], "chunk_index": c["chunk_index"],
         "text": c["text"][:300], "distance": round(c["distance"], 4)}
        for c in chunks
    ]
    yield _event({"type": "sources", "sources": sources})

    # 2. Build prompt from retrieved passages
    context = "\n\n---\n\n".join(
        f"[Passage {i+1}]\n{c['text']}" for i, c in enumerate(chunks)
    )
    prompt = (
        "Answer using ONLY the provided passages.\n\n"
        f"Passages:\n{context}\n\nQuestion: {question}\n\nAnswer:"
    )

    # 3. Stream tokens from Ollama
    try:
        stream = _ollama.chat(
            model=settings.ollama_model,
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            options={"temperature": 0, "num_predict": 512},
        )
        for chunk in stream:
            text: str = chunk["message"]["content"]
            if text:
                yield _event({"type": "token", "text": text})
    except Exception as exc:
        yield _event({"type": "error", "message": str(exc)})
        return

    yield _event({"type": "done", "model": settings.ollama_model})

The route handler

pythonrouters/query_stream.py (continued)
_SSE_HEADERS = {
    "Cache-Control": "no-cache",
    "X-Accel-Buffering": "no",  # disable nginx buffering
}

@router.get("/stream/", response_class=StreamingResponse)
@limiter.limit("30/minute")
async def stream_query(
    request: Request,                    # required by slowapi
    question: str = Query(..., min_length=1, max_length=500),
    k: int = Query(default=4, ge=1, le=10),
    doc_id: Optional[str] = Query(default=None),
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
) -> StreamingResponse:

    chunks = search_chunks(
        tenant_id=current_user.id, query=question, k=k, doc_id=doc_id
    )

    if not chunks:
        async def _no_docs():
            yield _event({"type": "error", "message": "No documents found."})
        return StreamingResponse(_no_docs(), media_type="text/event-stream", headers=_SSE_HEADERS)

    return StreamingResponse(
        _generate(question, chunks),
        media_type="text/event-stream",
        headers=_SSE_HEADERS,
    )
💡

Why X-Accel-Buffering: no? When running behind nginx (common in production), nginx buffers proxy responses by default. Without this header, all tokens accumulate in nginx's buffer and are delivered in one batch when generation completes — identical to the non-streaming endpoint from the user's perspective. This single header is the difference between real streaming and fake streaming.

⚛️Frontend Setup

The frontend is a standard Vite + React 18 + TypeScript project. No UI component library — just React hooks and vanilla CSS. This keeps the code readable and teaches the patterns rather than hiding them behind abstractions.

shellbootstrap (alternative to copying files)
cd article-04/frontend
npm install

vite.config.ts — the proxy

Vite's proxy option forwards matching paths to the FastAPI backend. This means the browser makes requests to http://localhost:5173/query/stream/ and Vite transparently rewrites them to http://localhost:8000/query/stream/ — no CORS headers needed during development.

typescriptfrontend/vite.config.ts
import { defineConfig } from "vite";
import react from "@vitejs/plugin-react";

export default defineConfig({
  plugins: [react()],
  server: {
    proxy: {
      "/auth":      "http://localhost:8000",
      "/documents": "http://localhost:8000",
      "/query":     "http://localhost:8000",
    },
  },
});

🔌API Client

All backend communication lives in api.ts. The most important export is streamQuery — an async generator that yields typed SSEEvent objects. The caller iterates with for await...of and reacts to each event type.

src/types.ts — the SSE event union

typescriptsrc/types.ts
export interface Source {
  doc_id: string; chunk_index: number; text: string; distance: number;
}

export type SSEEvent =
  | { type: "sources"; sources: Source[] }
  | { type: "token";   text: string }
  | { type: "done";    model: string }
  | { type: "error";   message: string };

src/api.ts — the streaming generator

typescriptsrc/api.ts
export async function* streamQuery(
  question: string,
  token: string,
  opts: { k?: number; doc_id?: string; signal?: AbortSignal } = {},
): AsyncGenerator<SSEEvent> {

  const params = new URLSearchParams({
    question,
    k: String(opts.k ?? 4),
    ...(opts.doc_id ? { doc_id: opts.doc_id } : {}),
  });

  const response = await fetch(`/query/stream/?${params}`, {
    headers: { Authorization: `Bearer ${token}` },
    signal: opts.signal,
  });
  if (!response.ok) throw new Error(`HTTP ${response.status}`);

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // Append decoded bytes; stream:true handles multi-byte chars across chunks
    buffer += decoder.decode(value, { stream: true });

    // Split on newlines; last element may be an incomplete line — keep in buffer
    const lines = buffer.split("\n");
    buffer = lines.pop()!;

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        try { yield JSON.parse(line.slice(6)) as SSEEvent; }
        catch { /* malformed JSON — skip */ }
      }
    }
  }
}
💡

Why { stream: true } in TextDecoder? Multi-byte UTF-8 characters (accented letters, emoji, CJK) can be split across two read() calls. Without stream: true, the decoder tries to decode an incomplete byte sequence and produces the replacement character U+FFFD (�). With it, the decoder buffers the incomplete bytes and completes them on the next call.

💬Chat Component

The Chat component manages four pieces of state: the settled message history, the live streaming text, the sources for the current response, and the streaming boolean that toggles between Send and Stop buttons.

State design

typescriptsrc/Chat.tsx — state
const [messages, setMessages]     = useState<Message[]>([]);
const [liveText, setLiveText]     = useState("");     // tokens accumulate here
const [liveSources, setLiveSources] = useState<Source[]>([]);
const [streaming, setStreaming]   = useState(false);
const abortRef = useRef<AbortController | null>(null);

Using a ref for AbortController instead of state is intentional: we need to call abort() from the Stop button handler, but we don't want the controller replacement to trigger a re-render.

The send function

typescriptsrc/Chat.tsx — send()
async function send() {
  const question = input.trim();
  if (!question || streaming) return;

  setInput("");
  setMessages(prev => [...prev, { role: "user", content: question }]);
  setStreaming(true);
  setLiveText("");

  const ctrl = new AbortController();
  abortRef.current = ctrl;
  let accumulated = "";
  let sources: Source[] = [];

  try {
    for await (const event of streamQuery(question, token, { signal: ctrl.signal })) {
      if      (event.type === "sources") { sources = event.sources; setLiveSources(sources); }
      else if (event.type === "token")   { accumulated += event.text; setLiveText(accumulated); }
      else if (event.type === "done")    {
        // Commit the completed answer to settled history
        setMessages(prev => [...prev, { role: "assistant", content: accumulated, sources }]);
        setLiveText("");
        setLiveSources([]);
      }
      else if (event.type === "error")   {
        setMessages(prev => [...prev, { role: "assistant", content: `⚠ ${event.message}` }]);
        setLiveText("");
      }
    }
  } catch (err: unknown) {
    if ((err as Error)?.name !== "AbortError") {
      setMessages(prev => [...prev, { role: "assistant", content: "Connection error." }]);
    }
    setLiveText("");
  } finally {
    setStreaming(false);
  }
}

The live streaming bubble

While tokens are arriving, a separate "live bubble" renders liveText with a blinking CSS cursor. When the done event fires, the accumulated text is committed to messages and the live bubble disappears.

typescriptsrc/Chat.tsx — live bubble JSX
{streaming && liveText && (
  <div className="bubble bubble-assistant bubble-live">
    <p className="bubble-text">
      {liveText}<span className="cursor" />
    </p>
  </div>
)}
csssrc/index.css — blinking cursor
.cursor {
  display: inline-block; width: 2px; height: 1em;
  background: var(--violet); margin-left: 2px;
  vertical-align: text-bottom;
  animation: blink .7s step-end infinite;
}
@keyframes blink { 50% { opacity: 0; } }

🚀Running Everything

Three terminals — Ollama, FastAPI, and Vite. Once all three are running, open http://localhost:5173.

shellTerminal 1 — Ollama
# Pull the model once (2 GB download)
ollama pull llama3.2:3b

# Start the Ollama server (if not already running as a service)
ollama serve
shellTerminal 2 — FastAPI backend
cd article-04 pip install -r requirements.txt uvicorn main:app --reload --port 8000
shellTerminal 3 — Vite dev server
cd article-04/frontend
npm install
npm run dev
# → http://localhost:5173

Quick smoke test with curl

Before opening the browser, verify the SSE endpoint works from the command line:

shell
# 1. Register + get a token
curl -s -X POST http://localhost:8000/auth/register \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"secret123"}'

TOKEN=$(curl -s -X POST http://localhost:8000/auth/token \
  -F "username=test@example.com" -F "password=secret123" \
  | python3 -c "import sys,json; print(json.load(sys.stdin)['access_token'])")

# 2. Upload a document
echo "Paris is the capital of France." > /tmp/geo.txt
curl -s -X POST http://localhost:8000/documents/ \
  -H "Authorization: Bearer $TOKEN" \
  -F "file=@/tmp/geo.txt"

# 3. Stream a query — watch tokens arrive one by one
curl -s -N -H "Authorization: Bearer $TOKEN" \
  "http://localhost:8000/query/stream/?question=What+is+the+capital+of+France%3F"

Production Checklist

⚠ In-memory rate limit state is lost on restart
By default, slowapi stores rate-limit counters in memory. Every uvicorn restart or worker restart resets the counters — a client that hit their limit can retry immediately after a crash or deploy.
Pass a Redis storage backend to the Limiter: Limiter(key_func=get_remote_address, storage_uri="redis://localhost:6379"). Counters survive restarts and are shared across all uvicorn workers.
⚠ SSE connections hold open sockets
Each active SSE stream is a long-lived HTTP connection. With 100 concurrent users each taking 5 seconds to generate, that's 100 open sockets. uvicorn's default worker count (1) processes them sequentially. A slow Ollama generation blocks all other requests behind it.
Run uvicorn main:app --workers 4 with gunicorn as the process manager. Switch ChromaDB to HttpClient (see Article 3 production checklist). Each worker gets its own Ollama queue — 4 workers means 4 concurrent generations.
⚠ sessionStorage token is lost on tab close
The frontend stores the JWT in sessionStorage, which is cleared when the browser tab closes. This is intentional for security but means users re-login every session. For a production app, users expect to stay logged in.
Move the token to localStorage for persistent login, or implement refresh tokens: short-lived access tokens (15 min) + long-lived refresh tokens stored in an HttpOnly cookie. The cookie approach is the most secure — JavaScript cannot read HttpOnly cookies, so XSS cannot steal the refresh token.
⚠ Rate limiting by IP breaks behind a reverse proxy
If FastAPI runs behind nginx, all requests arrive with the nginx server IP (127.0.0.1) as the remote address. Every user shares the same rate-limit bucket — the first 30 requests block everyone else.
Configure nginx to pass X-Real-IP or X-Forwarded-For headers, then tell slowapi to trust them: Limiter(key_func=get_remote_address) + set FORWARDED_ALLOW_IPS=* in uvicorn, or use slowapi.util.get_ipaddr which reads the forwarded headers.

Streaming architecture decision tree

NEED STREAMING?
Answers < 1s → don't bother, batch response is simpler. Answers 2s+ → SSE streaming is worth the complexity. Very long answers (10s+) → streaming is mandatory for acceptable UX.
AUTH METHOD?
Public stream → use built-in EventSource API (simpler, auto-reconnect). JWT or API key auth → use fetch() + ReadableStream (full header control). Cookie authEventSource works since cookies are sent automatically.
RATE LIMIT STORAGE?
Single process, dev → in-memory (default, zero config). Multiple workers or hosts → Redis (storage_uri="redis://..."). Serverless / ephemeral → Redis with TTL, or a managed rate-limit service.
LLM CONCURRENCY?
1–5 concurrent users → Ollama single process is fine, each request queues. 5–50 users → multiple uvicorn workers + ChromaDB HttpClient. 50+ users → vLLM or TGI for true parallel LLM serving with GPU batching.

Real-time AI on your own hardware.

You now have a full-stack streaming RAG chat — tokens appear the moment Ollama produces them, sources surface before the first word, and the Stop button cancels generation mid-stream. Every piece runs locally with zero subscription cost.

The next article adds evaluation: automated faithfulness and relevance scoring so you can measure whether your RAG answers are actually correct — not just fast.

→ Article 5: Evaluating RAG Quality with RAGAS

All code tested on Python 3.11 · Node 20. Pinned versions: fastapi 0.115.6 · ollama 0.4.4 · slowapi 0.1.9 · react 18.3.1 · vite 6.0.7 · typescript 5.7.2.