4. Streaming RAG Chat — FastAPI SSE + React
Add token-by-token streaming to your RAG pipeline. Server-Sent Events from FastAPI, a React client with AbortController, and the nuances of streaming structured retrieval responses.
Streaming RAG Chat
FastAPI SSE + React
Add token-by-token streaming to the Article 3 backend, then build a minimal React chat UI that renders each word as it arrives — no paid APIs, no UI frameworks.
🏗What You'll Build
Article 3 built a fully working RAG API — upload documents, ask questions, get answers. One problem: the user stares at a blank screen for 2–8 seconds waiting for Ollama to finish generating. Streaming fixes that by sending each token the moment it's produced, so the answer appears word-by-word like a typewriter.
This article adds two things to the Article 3 codebase:
Backend addition
- New
GET /query/stream/endpoint that returns a Server-Sent Events stream - Per-IP rate limiting (30 req/min) via slowapi
- Nginx buffering disabled so tokens aren't held back
Frontend (new)
- Vite + React 18 + TypeScript — no UI framework
- Login / register form
- Chat window with live token rendering and a blinking cursor
- Stop button via
AbortController
The full data path from keypress to rendered token:
📡How SSE Works
Server-Sent Events (SSE) is a one-directional HTTP stream from server to client. The response never closes — the server keeps writing newline-delimited text, and the client reads it incrementally. Each event is a line prefixed with data: followed by a JSON payload, terminated by two newlines:
data: {"type":"sources","sources":[{"doc_id":"abc","text":"..."}]}
data: {"type":"token","text":"Paris"}
data: {"type":"token","text":" is"}
data: {"type":"token","text":" the"}
data: {"type":"done","model":"llama3.2:3b"}
EventSource vs fetch()
Browsers have a built-in EventSource API for SSE, but it has one critical limitation: it cannot send custom headers. That rules it out for JWT authentication. Instead we use fetch() with the Authorization header and read the response body as a ReadableStream.
EventSource (browser built-in)
- Simple API — automatic reconnect on disconnect
- ❌ GET only, no custom headers
- ❌ Cannot send Authorization: Bearer
- Good for public streams (stock tickers, news feeds)
fetch() + ReadableStream (our choice)
- Full control over headers — JWT works
- Requires manual SSE line parsing
- Manual reconnect if needed
- ✅ Required for any authenticated SSE
A single reader.read() call may return multiple SSE lines, or a partial line that continues in the next read. The parser must buffer across reads and split on \n — not assume one event per chunk. This is the most common mistake when implementing SSE clients from scratch.
🧰Technology Stack
ollama.chat(stream=True)
Ollama's Python SDK supports streaming iteration: for chunk in ollama.chat(..., stream=True) yields one token dict per iteration. Zero extra dependencies — same ollama==0.4.4 from Article 3.
FastAPI StreamingResponse
Wraps an async generator with media_type="text/event-stream". The X-Accel-Buffering: no header tells nginx not to hold back chunks — without it, tokens batch up and arrive all at once.
slowapi 0.1.9
Flask-Limiter port for FastAPI. Decorator-based: @limiter.limit("30/minute"). Uses the limits library for storage backends — in-memory by default, Redis for multi-process.
React 18 + Vite 6
React 18 for the UI, Vite for instant HMR dev server. Vite's proxy config forwards /query, /auth, /documents to FastAPI — no CORS issues in development.
fetch() + ReadableStream
Native browser API — no libraries. response.body.getReader() + TextDecoder + manual SSE line parser. Works in every modern browser. The async generator pattern makes it composable.
AbortController
Cancels in-flight streams when the user clicks Stop. Pass signal to fetch() and the generator catches AbortError — Ollama stops generating the moment the connection drops.
📁Project Setup
Article 4 extends Article 3's codebase. Copy the article-03/ directory, or start fresh and copy the source files. The only shared files that change are main.py and requirements.txt — all other Article 3 files (auth.py, database.py, vectorstore.py, etc.) are unchanged.
article-04/ ├── main.py # updated — adds rate limiter + new router ├── rate_limit.py # new — slowapi Limiter singleton ├── requirements.txt # updated — adds slowapi + limits ├── routers/ │ ├── auth.py # unchanged from Article 3 │ ├── documents.py # unchanged from Article 3 │ ├── query.py # unchanged from Article 3 │ └── query_stream.py # new — SSE streaming endpoint ├── config.py, database.py, auth.py, embedder.py, │ vectorstore.py, ingestor.py, llm.py, schemas.py │ # all unchanged from Article 3 └── frontend/ # new — Vite + React app ├── package.json ├── vite.config.ts ├── index.html └── src/ ├── main.tsx ├── App.tsx ├── LoginForm.tsx ├── Chat.tsx ├── api.ts ├── types.ts └── index.css
Install the two new Python dependencies:
pip install slowapi==0.1.9 limits==3.7.0
🚦Backend: Rate Limiter
The streaming endpoint is expensive — each request ties up an Ollama process for several seconds. Without rate limiting, a single client can send dozens of concurrent requests and lock the server. slowapi adds per-IP limits with a single decorator.
rate_limit.py — the singleton
from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address)
The key_func determines who gets rate limited. get_remote_address uses the client IP. For per-user limits, swap it for a function that extracts sub from the JWT — but that requires decoding the token before the dependency injection layer runs, so IP-based limiting is the practical choice for this stack.
main.py — wire up the limiter
Two lines connect slowapi to FastAPI: attach the limiter to app state, and register the RateLimitExceeded handler that returns a proper HTTP 429 response.
from slowapi import _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from rate_limit import limiter from routers import auth, documents, query, query_stream app = FastAPI(...) # Rate-limit state — must be set before any route is registered app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.include_router(auth.router) app.include_router(documents.router) app.include_router(query.router) app.include_router(query_stream.router) # new
slowapi requires the Request object as the first parameter of any route decorated with @limiter.limit(). If you omit it, you get a cryptic TypeError at startup. The request: Request parameter does not need to be used in the function body — slowapi reads it internally.
⚡Backend: Streaming Endpoint
question (required), k (default 4), doc_id (optional). Requires Authorization: Bearer <token> header. Rate-limited to 30 req/min per IP.SSE event contract
| type | When sent | Payload fields | Action in UI |
|---|---|---|---|
sources | Before first token | sources: Source[] | Show retrieved passages |
token | Each LLM token | text: string | Append to live answer |
done | Generation complete | model: string | Commit bubble to history |
error | Any failure | message: string | Show error message |
Sources come before the first token so the UI can show "retrieved 3 passages" while the LLM is still generating. This makes the response feel faster — the user sees progress immediately instead of waiting for the first word.
The async generator
def _event(payload: dict) -> str: return f"data: {json.dumps(payload)}\n\n" async def _generate(question: str, chunks: list[dict]) -> AsyncGenerator[str, None]: settings = get_settings() # 1. Emit source passages before the first token sources = [ {"doc_id": c["doc_id"], "chunk_index": c["chunk_index"], "text": c["text"][:300], "distance": round(c["distance"], 4)} for c in chunks ] yield _event({"type": "sources", "sources": sources}) # 2. Build prompt from retrieved passages context = "\n\n---\n\n".join( f"[Passage {i+1}]\n{c['text']}" for i, c in enumerate(chunks) ) prompt = ( "Answer using ONLY the provided passages.\n\n" f"Passages:\n{context}\n\nQuestion: {question}\n\nAnswer:" ) # 3. Stream tokens from Ollama try: stream = _ollama.chat( model=settings.ollama_model, messages=[{"role": "user", "content": prompt}], stream=True, options={"temperature": 0, "num_predict": 512}, ) for chunk in stream: text: str = chunk["message"]["content"] if text: yield _event({"type": "token", "text": text}) except Exception as exc: yield _event({"type": "error", "message": str(exc)}) return yield _event({"type": "done", "model": settings.ollama_model})
The route handler
_SSE_HEADERS = {
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # disable nginx buffering
}
@router.get("/stream/", response_class=StreamingResponse)
@limiter.limit("30/minute")
async def stream_query(
request: Request, # required by slowapi
question: str = Query(..., min_length=1, max_length=500),
k: int = Query(default=4, ge=1, le=10),
doc_id: Optional[str] = Query(default=None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> StreamingResponse:
chunks = search_chunks(
tenant_id=current_user.id, query=question, k=k, doc_id=doc_id
)
if not chunks:
async def _no_docs():
yield _event({"type": "error", "message": "No documents found."})
return StreamingResponse(_no_docs(), media_type="text/event-stream", headers=_SSE_HEADERS)
return StreamingResponse(
_generate(question, chunks),
media_type="text/event-stream",
headers=_SSE_HEADERS,
)
Why X-Accel-Buffering: no? When running behind nginx (common in production), nginx buffers proxy responses by default. Without this header, all tokens accumulate in nginx's buffer and are delivered in one batch when generation completes — identical to the non-streaming endpoint from the user's perspective. This single header is the difference between real streaming and fake streaming.
⚛️Frontend Setup
The frontend is a standard Vite + React 18 + TypeScript project. No UI component library — just React hooks and vanilla CSS. This keeps the code readable and teaches the patterns rather than hiding them behind abstractions.
cd article-04/frontend npm install
vite.config.ts — the proxy
Vite's proxy option forwards matching paths to the FastAPI backend. This means the browser makes requests to http://localhost:5173/query/stream/ and Vite transparently rewrites them to http://localhost:8000/query/stream/ — no CORS headers needed during development.
import { defineConfig } from "vite"; import react from "@vitejs/plugin-react"; export default defineConfig({ plugins: [react()], server: { proxy: { "/auth": "http://localhost:8000", "/documents": "http://localhost:8000", "/query": "http://localhost:8000", }, }, });
🔌API Client
All backend communication lives in api.ts. The most important export is streamQuery — an async generator that yields typed SSEEvent objects. The caller iterates with for await...of and reacts to each event type.
src/types.ts — the SSE event union
export interface Source { doc_id: string; chunk_index: number; text: string; distance: number; } export type SSEEvent = | { type: "sources"; sources: Source[] } | { type: "token"; text: string } | { type: "done"; model: string } | { type: "error"; message: string };
src/api.ts — the streaming generator
export async function* streamQuery( question: string, token: string, opts: { k?: number; doc_id?: string; signal?: AbortSignal } = {}, ): AsyncGenerator<SSEEvent> { const params = new URLSearchParams({ question, k: String(opts.k ?? 4), ...(opts.doc_id ? { doc_id: opts.doc_id } : {}), }); const response = await fetch(`/query/stream/?${params}`, { headers: { Authorization: `Bearer ${token}` }, signal: opts.signal, }); if (!response.ok) throw new Error(`HTTP ${response.status}`); const reader = response.body!.getReader(); const decoder = new TextDecoder(); let buffer = ""; while (true) { const { done, value } = await reader.read(); if (done) break; // Append decoded bytes; stream:true handles multi-byte chars across chunks buffer += decoder.decode(value, { stream: true }); // Split on newlines; last element may be an incomplete line — keep in buffer const lines = buffer.split("\n"); buffer = lines.pop()!; for (const line of lines) { if (line.startsWith("data: ")) { try { yield JSON.parse(line.slice(6)) as SSEEvent; } catch { /* malformed JSON — skip */ } } } } }
Why { stream: true } in TextDecoder? Multi-byte UTF-8 characters (accented letters, emoji, CJK) can be split across two read() calls. Without stream: true, the decoder tries to decode an incomplete byte sequence and produces the replacement character U+FFFD (�). With it, the decoder buffers the incomplete bytes and completes them on the next call.
💬Chat Component
The Chat component manages four pieces of state: the settled message history, the live streaming text, the sources for the current response, and the streaming boolean that toggles between Send and Stop buttons.
State design
const [messages, setMessages] = useState<Message[]>([]); const [liveText, setLiveText] = useState(""); // tokens accumulate here const [liveSources, setLiveSources] = useState<Source[]>([]); const [streaming, setStreaming] = useState(false); const abortRef = useRef<AbortController | null>(null);
Using a ref for AbortController instead of state is intentional: we need to call abort() from the Stop button handler, but we don't want the controller replacement to trigger a re-render.
The send function
async function send() { const question = input.trim(); if (!question || streaming) return; setInput(""); setMessages(prev => [...prev, { role: "user", content: question }]); setStreaming(true); setLiveText(""); const ctrl = new AbortController(); abortRef.current = ctrl; let accumulated = ""; let sources: Source[] = []; try { for await (const event of streamQuery(question, token, { signal: ctrl.signal })) { if (event.type === "sources") { sources = event.sources; setLiveSources(sources); } else if (event.type === "token") { accumulated += event.text; setLiveText(accumulated); } else if (event.type === "done") { // Commit the completed answer to settled history setMessages(prev => [...prev, { role: "assistant", content: accumulated, sources }]); setLiveText(""); setLiveSources([]); } else if (event.type === "error") { setMessages(prev => [...prev, { role: "assistant", content: `⚠ ${event.message}` }]); setLiveText(""); } } } catch (err: unknown) { if ((err as Error)?.name !== "AbortError") { setMessages(prev => [...prev, { role: "assistant", content: "Connection error." }]); } setLiveText(""); } finally { setStreaming(false); } }
The live streaming bubble
While tokens are arriving, a separate "live bubble" renders liveText with a blinking CSS cursor. When the done event fires, the accumulated text is committed to messages and the live bubble disappears.
{streaming && liveText && (
<div className="bubble bubble-assistant bubble-live">
<p className="bubble-text">
{liveText}<span className="cursor" />
</p>
</div>
)}
.cursor { display: inline-block; width: 2px; height: 1em; background: var(--violet); margin-left: 2px; vertical-align: text-bottom; animation: blink .7s step-end infinite; } @keyframes blink { 50% { opacity: 0; } }
🚀Running Everything
Three terminals — Ollama, FastAPI, and Vite. Once all three are running, open http://localhost:5173.
# Pull the model once (2 GB download) ollama pull llama3.2:3b # Start the Ollama server (if not already running as a service) ollama serve
cd article-04 pip install -r requirements.txt uvicorn main:app --reload --port 8000
cd article-04/frontend
npm install
npm run dev
# → http://localhost:5173
Quick smoke test with curl
Before opening the browser, verify the SSE endpoint works from the command line:
# 1. Register + get a token curl -s -X POST http://localhost:8000/auth/register \ -H "Content-Type: application/json" \ -d '{"email":"test@example.com","password":"secret123"}' TOKEN=$(curl -s -X POST http://localhost:8000/auth/token \ -F "username=test@example.com" -F "password=secret123" \ | python3 -c "import sys,json; print(json.load(sys.stdin)['access_token'])") # 2. Upload a document echo "Paris is the capital of France." > /tmp/geo.txt curl -s -X POST http://localhost:8000/documents/ \ -H "Authorization: Bearer $TOKEN" \ -F "file=@/tmp/geo.txt" # 3. Stream a query — watch tokens arrive one by one curl -s -N -H "Authorization: Bearer $TOKEN" \ "http://localhost:8000/query/stream/?question=What+is+the+capital+of+France%3F"
✅Production Checklist
slowapi stores rate-limit counters in memory. Every uvicorn restart or worker restart resets the counters — a client that hit their limit can retry immediately after a crash or deploy.Limiter: Limiter(key_func=get_remote_address, storage_uri="redis://localhost:6379"). Counters survive restarts and are shared across all uvicorn workers.uvicorn main:app --workers 4 with gunicorn as the process manager. Switch ChromaDB to HttpClient (see Article 3 production checklist). Each worker gets its own Ollama queue — 4 workers means 4 concurrent generations.sessionStorage, which is cleared when the browser tab closes. This is intentional for security but means users re-login every session. For a production app, users expect to stay logged in.localStorage for persistent login, or implement refresh tokens: short-lived access tokens (15 min) + long-lived refresh tokens stored in an HttpOnly cookie. The cookie approach is the most secure — JavaScript cannot read HttpOnly cookies, so XSS cannot steal the refresh token.127.0.0.1) as the remote address. Every user shares the same rate-limit bucket — the first 30 requests block everyone else.X-Real-IP or X-Forwarded-For headers, then tell slowapi to trust them: Limiter(key_func=get_remote_address) + set FORWARDED_ALLOW_IPS=* in uvicorn, or use slowapi.util.get_ipaddr which reads the forwarded headers.Streaming architecture decision tree
EventSource API (simpler, auto-reconnect). JWT or API key auth → use fetch() + ReadableStream (full header control). Cookie auth → EventSource works since cookies are sent automatically.storage_uri="redis://..."). Serverless / ephemeral → Redis with TTL, or a managed rate-limit service.Real-time AI on your own hardware.
You now have a full-stack streaming RAG chat — tokens appear the moment Ollama produces them, sources surface before the first word, and the Stop button cancels generation mid-stream. Every piece runs locally with zero subscription cost.
The next article adds evaluation: automated faithfulness and relevance scoring so you can measure whether your RAG answers are actually correct — not just fast.
→ Article 5: Evaluating RAG Quality with RAGAS