Capstone: a mini service combining files, streams, and monitoring
A capstone ties the entire learning path into a single project so you get real-world instincts. Using everything we covered—uploads, streaming, tests, security, settings, caching, and monitoring—we will build an “e-receipt vault.” Users upload PDFs/images; the server extracts text and returns a summary. This is the final gate (20/20) of the FastAPI series, so let’s assemble every piece calmly.
Overview: the capstone in three key steps
- The user uploads a file and the server immediately replies “accepted.”
- A background worker runs OCR and summarization, storing the result in Redis.
- The user polls with the same ID to retrieve the outcome.
Hold this mini storyline and every design block below will make sense.
Explore the MVP first
@app.post("/receipts", status_code=202)
async def upload_receipt(file: UploadFile, background: BackgroundTasks):
receipt_id = uuid4().hex
path = save_file(file, receipt_id)
background.add_task(process_receipt, receipt_id, path)
return {"receipt_id": receipt_id}
@app.get("/receipts/{receipt_id}")
async def get_receipt(receipt_id: str):
data = await redis_client.get(f"receipt:{receipt_id}")
if not data:
raise HTTPException(404, "Still processing")
return json.loads(data)
Two endpoints already deliver the “upload → wait → result” flow. The next sections harden this skeleton.
Key terms
- Capstone: A final-stage project that combines multiple skills to mimic production, which is the purpose of this lesson.
- OCR (Optical Character Recognition): Extracts text from images/PDFs; it powers the uploaded-receipt summary.
- Redis pub/sub: Redis’s publish–subscribe messaging. Background jobs send progress updates to
progress:*channels, and SSE relays them live. - SSE (Server-Sent Events): A one-way streaming protocol where the server pushes text events sequentially to show status updates in real time.
Practice card
- Estimated time: 90 minutes
- Prereqs: Episodes 13–19, Redis + Pillow installed, pytest/httpx familiarity
- Goal: Combine the upload → background → cache → monitoring flow in one FastAPI service
Service requirements
- Mandatory flow: upload PDF/image → process OCR in the background → store results in Redis/DB → fetch via API.
- Optional: expose progress via SSE and attach logging/monitoring for visibility.
Visualizing the end-to-end flow
Sketch everything once so folders map to responsibilities (upload, workers, cache, monitoring).
MVP build order
- Prepare
core/config.pyplus alifespanhook to initialize Redis, logging, and media paths. - Finish the upload router with file persistence and background task enqueueing.
- Implement the background worker (sync or Celery) that runs OCR → summary → cache save.
- Build the retrieval router that reads Redis/DB and returns clear errors when missing.
- Reproduce the upload → fetch flow via pytest/httpx, and lock the
uvicorncommand into a script or service unit.
Follow the five steps in order to land the MVP. Treat SSE and monitoring upgrades as optional add-ons.
Essential modules
Folder structure snapshot
Keep OCR and summarization logic in services, and background wrappers in workers. Separation keeps testing and deployments tidy.
Settings and dependencies
# app/core/config.py
class Settings(BaseSettings):
media_path: Path = Path("media")
redis_url: str
otel_endpoint: str | None = None
uv_reload: bool = False
Initialize Redis, structlog, and OpenTelemetry inside the lifespan hook.
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = redis.from_url(settings.redis_url)
configure_logging()
if settings.otel_endpoint:
configure_tracing()
yield
await app.state.redis.aclose()
Upload + background task
@router.post("/receipts", status_code=202)
async def upload_receipt(
file: UploadFile = File("application/pdf"),
bg: BackgroundTasks,
redis: Redis = Depends(get_redis),
):
receipt_id = uuid4().hex
path = save_file(file, receipt_id)
bg.add_task(process_receipt, receipt_id, path, redis)
return {"receipt_id": receipt_id, "status": "processing"}
process_receipt performs OCR, summarization, and caching in sequence.
Retrieval + cache
@router.get("/receipts/{receipt_id}")
async def get_receipt(receipt_id: str, redis: Redis = Depends(get_redis)):
data = await redis.get(f"receipt:{receipt_id}")
if not data:
raise HTTPException(404, "Processing or receipt not found.")
return json.loads(data)
If you also store results in a database (see episode 9’s SQLModel pattern), you can recover data even when the cache expires.
pytest/httpx E2E test
- Upload a sample file to
/receipts. - Inject a Redis double and store the result immediately.
- Call
/receipts/{id}and verify the payload.
@pytest.mark.asyncio
async def test_receipt_flow(client, fake_redis):
response = await client.post("/receipts", files={"file": ("r.pdf", b"data", "application/pdf")})
receipt_id = response.json()["receipt_id"]
await fake_redis.set(f"receipt:{receipt_id}", json.dumps({"total": 12000}))
detail = await client.get(f"/receipts/{receipt_id}")
assert detail.json()["total"] == 12000
Do not spin up real Redis or OCR in tests—swap in fakes so you can focus on API contracts.
uvicorn command
uvicorn app.main:app --host 0.0.0.0 --port 9000 --workers 4 --loop uvloop --http httptools
Because a frontend will call this, keep the CORS setup from episode 16 and reuse the logging/monitoring stack from episode 19. In production wrap the command with systemd, Supervisor, or your container runtime.
Optional upgrades
Optional: stream progress
async def progress_stream(receipt_id: str, redis: Redis):
pubsub = redis.pubsub()
await pubsub.subscribe(f"progress:{receipt_id}")
try:
async for message in pubsub.listen():
if message["type"] == "message":
yield f"data: {message['data']}\n\n"
finally:
await pubsub.unsubscribe()
@router.get("/receipts/{receipt_id}/progress")
async def watch_progress(receipt_id: str, redis: Redis = Depends(get_redis)):
return StreamingResponse(progress_stream(receipt_id, redis), media_type="text/event-stream")
Background jobs publish redis.publish("progress:...", json.dumps(...)) at each stage. Browsers consume these SSE events to render live status.
Optional: logging and visibility
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger.error("unhandled_error", path=request.url.path, error=str(exc))
return JSONResponse(status_code=500, content={"detail": "Please try again soon."})
Split OCR failure vs. summary failure into separate error codes, and export Prometheus metrics or OpenTelemetry traces to spot bottlenecks. Finish the MVP first, then bolt on the metrics that matter.
Exercises
- Follow along: implement the upload → background → result flow and persist the final payload via Redis.
- Extend: add the SSE progress stream or Prometheus metrics so external tools can follow the processing state.
- Debug: rehearse upload failures, OCR errors, and Redis outages. Confirm error responses and logs stay consistent.
- Definition of done: (MVP) upload/result API covered by tests; (optional) SSE/monitoring surfaces progress events and metrics on a dashboard.
Wrap-up
- Upload/media management (episode 13)
- Streaming/background work (episode 14)
- Test automation (episode 15)
- Security/CORS (episode 16)
- Settings/secrets (episode 17)
- Cache/performance (episode 18)
- Logging/monitoring (episode 19)
Completing this capstone proves you can run a full FastAPI cycle. Mix the code from each episode and extend it for your own domain. FastAPI stays lightweight yet powerful—you have now seen every core pattern in action.
This capstone captures every key pattern from the entire series. Adapt the routers, services, and infrastructure to your needs and keep iterating.
💬 댓글
이 글에 대한 의견을 남겨주세요