캡스톤: 파일·스트림·모니터링이 결합된 미니 서비스
캡스톤(capstone)은 지금까지의 학습을 하나의 프로젝트로 엮어 실전 감각을 익히는 단계입니다. 그동안 배운 업로드, 스트리밍, 테스트, 보안, 설정, 캐시, 모니터링을 모두 묶어 "전자 영수증 보관 서비스"를 만들어봅니다. 사용자는 PDF/이미지를 업로드하고, 서버는 텍스트를 추출해 요약을 제공합니다. 이 편이 FastAPI 시리즈의 진짜 마지막 관문(20/20) 이니, 앞선 편에서 만든 조각들을 차근히 조립해 봅시다.
준비: 캡스톤 흐름을 세 줄로 압축하기
- 파일을 업로드하면 서버가 일단 "접수 완료"를 알려 줍니다.
- 백그라운드 작업이 OCR과 요약을 처리하고 Redis에 결과를 저장합니다.
- 사용자는 같은 ID로 다시 조회해 결과를 받습니다.
이 짧은 서사를 머릿속에 넣고 나면 아래 설계가 모두 이해 가능한 블록으로 보입니다.
MVP를 먼저 맛보기
@app.post("/receipts", status_code=202)
async def upload_receipt(file: UploadFile, background: BackgroundTasks):
receipt_id = uuid4().hex
path = save_file(file, receipt_id)
background.add_task(process_receipt, receipt_id, path)
return {"receipt_id": receipt_id}
@app.get("/receipts/{receipt_id}")
async def get_receipt(receipt_id: str):
data = await redis_client.get(f"receipt:{receipt_id}")
if not data:
raise HTTPException(404, "아직 처리 중이에요")
return json.loads(data)
불과 두 엔드포인트만으로도 "업로드 → 기다림 → 결과"라는 경험을 만들 수 있습니다. 아래 섹션에서는 이 기본 뼈대를 좀 더 안전하고 보기 좋게 다듬습니다.
이번 글에서 새로 나오는 용어
- 캡스톤(capstone): 학습 마지막 단계에서 여러 기술을 묶어 실전과 비슷한 프로젝트를 완성하는 활동으로, 이번 글의 전체 목표입니다.
- OCR(Optical Character Recognition): 이미지나 PDF에서 텍스트를 뽑아내는 기술로, 업로드한 영수증 내용을 요약하기 위한 핵심 처리 단계입니다.
- Redis pub/sub: Redis가 제공하는 발행-구독 메시징으로, 백그라운드 작업이 진행 상황을
progress:*채널로 보내고 SSE가 이를 실시간으로 전달합니다. - SSE(Server-Sent Events): 서버가 일방적으로 텍스트 이벤트를 순차적으로 전송하는 HTTP 스트리밍 방식으로, 영수증 처리 상태를 브라우저에 실시간으로 보여 줍니다.
실습 카드
- 예상 소요 시간: 90분
- 사전 준비: 13~19편 전체 예제, Redis와 Pillow 설치, pytest/httpx 사용 경험
- 실습 목표: 업로드→백그라운드→캐시→모니터링 흐름을 하나의 FastAPI 서비스로 묶는다
서비스 요구사항
- 필수 플로우: PDF/이미지 업로드 → 백그라운드에서 OCR 처리 → Redis/DB에 결과 저장 → API로 결과 조회.
- 선택 확장: SSE로 처리 진행을 노출하고, 로그·모니터링 스택을 붙여 운영 가시성을 높입니다.
전체 처리 플로우 시각화
각 요소가 어디에 위치하는지 한 번에 그려 둡니다. 폴더 구조와 연계해 어떤 모듈이 어떤 책임을 갖는지(업로드, 워커, 캐시, 모니터링) 빠르게 점검할 수 있습니다.
MVP 빌드 순서
core/config.py와lifespan훅을 준비해 Redis, 로거, 미디어 경로 같은 공통 의존성을 한 번에 초기화합니다.- 업로드 라우터에서 파일 저장과 백그라운드 태스크 큐잉까지 완성해 "요청 수락" 부분을 마칩니다.
- 백그라운드 작업(동기든 Celery든 상관없음)에서 OCR → 요약 → 캐시 저장 흐름을 구현합니다.
- 조회 라우터에서 Redis/DB에서 결과를 불러와 반환하고, 실패 시 명확한 오류 메시지를 돌려줍니다.
- pytest/httpx로 업로드→조회 흐름을 재현하고,
uvicorn실행 명령을 스크립트나 서비스 유닛으로 고정합니다.
위 다섯 단계를 반드시 순서대로 통과시키면 MVP가 완성됩니다. 이후 항목(SSE 진행률, 모니터링)은 선택 확장으로 분리해 생각하세요.
필수 모듈
폴더 구조 요약
services에는 OCR과 요약 로직을, workers에는 백그라운드 태스크 래퍼를 둡니다. 관심사를 나누면 테스트와 배포가 가벼워집니다.
설정과 의존성
# app/core/config.py
class Settings(BaseSettings):
media_path: Path = Path("media")
redis_url: str
otel_endpoint: str | None = None
uv_reload: bool = False
Redis 클라이언트, structlog 로거, OpenTelemetry 설정을 lifespan에서 초기화합니다.
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = redis.from_url(settings.redis_url)
configure_logging()
if settings.otel_endpoint:
configure_tracing()
yield
await app.state.redis.aclose()
업로드 + 백그라운드 태스크
@router.post("/receipts", status_code=202)
async def upload_receipt(
file: UploadFile = File("application/pdf"),
bg: BackgroundTasks,
redis: Redis = Depends(get_redis),
):
receipt_id = uuid4().hex
path = save_file(file, receipt_id)
bg.add_task(process_receipt, receipt_id, path, redis)
return {"receipt_id": receipt_id, "status": "processing"}
process_receipt는 OCR, 요약, 캐시 저장을 순차로 실행합니다.
결과 조회와 캐시
@router.get("/receipts/{receipt_id}")
async def get_receipt(receipt_id: str, redis: Redis = Depends(get_redis)):
data = await redis.get(f"receipt:{receipt_id}")
if not data:
raise HTTPException(404, "아직 처리 중이거나 존재하지 않는 영수증입니다.")
return json.loads(data)
OCR 결과와 요약문을 한 번 더 DB에 저장하고 싶다면 9편에서 다룬 SQLModel 패턴을 그대로 적용합니다. 캐시와 DB를 함께 쓰면 장애 시에도 데이터를 복구할 수 있습니다.
pytest/httpx로 E2E 테스트
/receipts로 샘플 파일 업로드.- Redis 더블을 주입해 곧바로 결과를 저장.
/receipts/{id}응답 값이 기대치와 일치하는지 검증합니다.
@pytest.mark.asyncio
async def test_receipt_flow(client, fake_redis):
response = await client.post("/receipts", files={"file": ("r.pdf", b"data", "application/pdf")})
receipt_id = response.json()["receipt_id"]
await fake_redis.set(f"receipt:{receipt_id}", json.dumps({"total": 12000}))
detail = await client.get(f"/receipts/{receipt_id}")
assert detail.json()["total"] == 12000
테스트에서 Redis·OCR 서비스를 모두 띄우지 말고, 가짜 구현을 주입해 API 인터페이스만 검증하는 데 집중하세요.
uvicorn 실행
uvicorn app.main:app --host 0.0.0.0 --port 9000 --workers 4 --loop uvloop --http httptools
프런트엔드가 접근해야 하므로 16편에서 다룬 CORS 설정을 적용하고, 19편의 로깅/모니터링 구성을 그대로 재사용합니다. 운영 환경에서는 systemd, Supervisor, 혹은 컨테이너 런타임으로 이 명령을 감싸 두세요.
(선택) 업그레이드
(선택) 처리 진행 스트리밍
async def progress_stream(receipt_id: str, redis: Redis):
pubsub = redis.pubsub()
await pubsub.subscribe(f"progress:{receipt_id}")
try:
async for message in pubsub.listen():
if message["type"] == "message":
yield f"data: {message['data']}\n\n"
finally:
await pubsub.unsubscribe()
@router.get("/receipts/{receipt_id}/progress")
async def watch_progress(receipt_id: str, redis: Redis = Depends(get_redis)):
return StreamingResponse(progress_stream(receipt_id, redis), media_type="text/event-stream")
백그라운드 작업은 각 단계마다 redis.publish("progress:...", json.dumps(...))로 진행률 메시지를 보냅니다. 클라이언트는 SSE(Server-Sent Events, 서버가 일방적으로 텍스트 이벤트를 푸시하는 방식)로 이 메시지를 받아 실시간 진행 상황을 표시할 수 있습니다.
(선택) 로깅과 운영 가시성
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger.error("unhandled_error", path=request.url.path, error=str(exc))
return JSONResponse(status_code=500, content={"detail": "잠시 후 다시 시도하세요."})
OCR 실패, 요약 실패 등 상황에 따라 세부 오류 코드를 분리하고, Prometheus 커스텀 메트릭이나 OpenTelemetry 트레이스를 노출하면 병목을 빠르게 포착할 수 있습니다. 필수 MVP를 완성한 뒤, 필요한 로그/지표만 골라 붙이세요.
실습
- 따라 하기: 업로드→백그라운드 처리→결과 조회 흐름을 구현하고, Redis를 통해 최종 결과를 저장한다.
- 확장하기: SSE 진행 스트림이나 Prometheus 지표를 붙여 처리 상태를 외부에서 추적할 수 있게 한다.
- 디버깅: 업로드 실패, OCR 오류, Redis 연결 실패 같은 장애 시나리오를 각각 재현하고, 에러 응답과 로그가 일관되게 남는지 확인한다.
- 완료 기준: (MVP) 업로드→결과 API가 테스트로 재현되고, (선택) SSE/모니터링을 켰다면 진행률 이벤트와 지표가 대시보드에 노출된다.
마무리
- 업로드/미디어 관리 (13편)
- 스트리밍·백그라운드 (14편)
- 테스트 자동화 (15편)
- 보안·CORS (16편)
- 설정/시크릿 (17편)
- 캐시/성능 (18편)
- 로깅/모니터링 (19편)
이 캡스톤을 완성하면 FastAPI로 한 사이클을 다 돌렸다는 자신감을 얻게 됩니다. 각 편의 코드를 필요에 맞게 조합하며 자신만의 서비스를 구체화해 보세요.
이 캡스톤 예제는 시리즈 전반의 핵심 패턴을 모두 포함합니다. 이제 여러분의 도메인 요구사항에 맞춰 라우터, 서비스, 인프라를 확장해 보세요. FastAPI는 가볍지만 충분히 강력한 프레임워크라는 사실을 실습으로 확인했을 것입니다.
💬 댓글
이 글에 대한 의견을 남겨주세요