이번 편에서는 긴 데이터를 조금씩 보내는 스트리밍 응답과 API 응답 후 백그라운드에서 실행되는 작업을 구현합니다. StreamingResponse는 FastAPI가 생성자에서 넘긴 반복자를 그대로 HTTP 응답 스트림으로 흘려보내도록 돕는 클래스입니다. BackgroundTasks는 응답을 보낸 뒤 서버 안에서 실행할 콜백 목록을 관리합니다. 앞서 만든 작업 관리 API에 파일 처리나 보고서 생성 같은 기능을 붙일 때 유용합니다.
이번 글에서 새로 나오는 용어
- StreamingResponse: FastAPI가 제너레이터나 반복자가 내보내는 데이터를 조각 단위로 바로 전송하도록 도와주는 응답 타입입니다.
- 스트리밍(streaming): 서버가 전체 결과를 계산할 때까지 기다리지 않고 데이터를 여러 조각으로 흘려보내는 통신 방식으로, 다운로드·실시간 로그에 활용됩니다.
- text/event-stream: Server-Sent Events(SSE)를 위한 MIME 타입으로, 브라우저가 연결을 유지하며 순차적으로 오는 메시지를 처리할 때 사용합니다.
- BackgroundTasks: FastAPI가 응답을 끝낸 뒤 같은 프로세스에서 실행할 콜백을 모아 주는 객체로, 이메일 알림이나 썸네일 생성 같은 후처리를 맡깁니다.
- keep-alive 타임아웃: HTTP 연결을 얼마나 오래 유지할지 정하는 서버 설정 값으로, 스트리밍 응답이 중간에 끊기지 않도록 조정해야 합니다.
실습 카드
- 예상 소요 시간: 50분
- 사전 준비: 13편 미디어 처리 코드, asyncio 기본
- 실습 목표: StreamingResponse와 BackgroundTasks를 조합해 긴 작업을 분리한다
StreamingResponse 소개
스트리밍(streaming)은 서버가 전체 결과를 계산할 때까지 기다리지 않고 데이터를 여러 조각으로 흘려보내는 방식입니다. 파일 다운로드, 서버 로그 tail, SSE(Server-Sent Events) 등에 활용합니다.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
def iter_numbers(limit: int):
for i in range(1, limit + 1):
yield f"data: {i}\n"
@app.get("/stream")
def stream_numbers(limit: int = 5):
return StreamingResponse(iter_numbers(limit), media_type="text/event-stream")
text/event-stream은 SSE 표준 MIME 타입입니다. 클라이언트는 연결을 유지해 이벤트를 순서대로 받습니다. 데이터를 조각으로 흘려보내면 긴 계산 시간 동안에도 사용자에게 진행 상황을 전달할 수 있습니다.
파일 다운로드 스트림
대용량 CSV를 메모리에 모두 올리지 않고 생성 즉시 내려보내는 예시입니다.
from io import StringIO
def generate_csv(rows: list[dict]):
buffer = StringIO()
writer = csv.DictWriter(buffer, fieldnames=rows[0].keys())
writer.writeheader()
for row in rows:
writer.writerow(row)
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
@app.get("/reports.csv")
def export_report():
return StreamingResponse(
generate_csv(fetch_rows()),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=report.csv"},
)
fetch_rows()는 데이터베이스에서 필요한 행을 라우터 수준에서 스트림처럼 가져오는 함수를 가정합니다. 이 구조를 쓰면 메모리 사용량을 줄이고 다운로드 속도를 일정하게 유지할 수 있습니다.
BackgroundTasks로 후속 처리
BackgroundTasks 객체는 응답 직후 실행할 콜백을 등록합니다.
from fastapi import BackgroundTasks
def notify_user(email: str, report_url: str):
print(f"{email}님, 보고서가 준비되었습니다: {report_url}")
@app.post("/reports")
async def request_report(bg: BackgroundTasks, email: EmailStr):
url = await generate_report_async(email)
bg.add_task(notify_user, email, url)
return {"detail": "보고서 준비 중", "url": url}
백그라운드 작업은 동일한 프로세스에서 실행됩니다. CPU 사용량이 큰 작업은 Celery 같은 별도 워커로 옮겨야 안전합니다.
스트리밍 + 백그라운드 조합
사용자가 업로드한 파일을 즉시 검증하면서 업로드 완료 후 썸네일 작업을 백그라운드에서 돌릴 수 있습니다.
@app.post("/photos/async")
async def upload_photo(file: UploadFile, bg: BackgroundTasks):
path = save_temp(file)
bg.add_task(generate_thumbnail, path)
return StreamingResponse(
iter(["업로드 완료, 썸네일 생성 중..."]),
media_type="text/plain",
)
응답 본문은 간단합니다. 클라이언트는 백그라운드 작업 진행 상황을 별도 폴링 엔드포인트로 확인하도록 설계합니다.
D2로 본 실행 흐름
실제 서비스에서는 워커 노드가 다른 컨테이너나 별도 큐 시스템일 수 있습니다. FastAPI 내장 BackgroundTasks로도 기본 패턴을 빠르게 실험한 뒤 필요할 때 큐 시스템으로 확장하면 됩니다.
uvicorn 설정
스트리밍은 연결을 오래 유지합니다. uvicorn app.main:app --reload --timeout-keep-alive 75처럼 keep-alive 타임아웃을 늘려야 중간에 연결이 끊기지 않습니다. 이렇게 하면 클라이언트가 다운로드 중간에 끊기는 문제를 줄일 수 있습니다.
스트리밍과 백그라운드 태스크를 적절히 결합하면 사용자에게 빠른 피드백을 제공하면서도 서버 리소스를 효율적으로 사용할 수 있습니다. 다음 편에서는 API를 자동화 테스트하는 방법을 알아봅니다.
실습
- 따라 하기:
/stream또는/reports.csv엔드포인트를 만들어 StreamingResponse가 순차적으로 데이터를 내보내는지 확인한다. - 확장하기: 백그라운드 작업을 연결해 보고서 요청 후 이메일 알림(콘솔 출력)을 보내거나 썸네일 생성 작업을 등록한다.
- 디버깅: keep-alive나 chunk 크기 문제로 응답이 끊길 때 uvicorn 설정을 조정해 해결한다.
- 완료 기준: 스트리밍 엔드포인트가 브라우저/
curl에서 끊김 없이 데이터를 내보내고, 백그라운드 콜백이 로그에 기록된다.
마무리
긴 작업을 스트림으로 쪼개고 백그라운드 콜백으로 분리하면 느린 기능도 사용자에게 즉시 진행 상황을 알려 줄 수 있습니다. 이 패턴을 익혀 두면 보고서, 미디어 처리, 알림 발송 같은 작업을 안정적으로 분리할 수 있습니다.
💬 댓글
이 글에 대한 의견을 남겨주세요