FastAPI + Celery 연동 예제(FastAPI with Celery example)

FastAPI에서 Celery를 사용하여 비동기 태스크를 실행하는 예제를 정리해보겠습니다. 참고로 localhost에 redis가 동작중이어야합니다.

main.py

from fastapi import FastAPI
from celery_worker import divide

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/work")
async def work(task_id: str, input_a: int, input_b: int):
    divide.apply_async([input_a, input_b], task_id=task_id)
    return {"message": "celery start"}

@app.get("/work_result")
async def work_result(task_id: str):
    result = divide.AsyncResult(task_id)
    return {"message": result.info}

main 모듈입니다. FastAPI가 실행되는 모듈이고 총 세 개의 endpoint를 갖고 있습니다. root endpoint는 단순히 Hello World 메시지를 출력합니다. /work endpoint는 태스크를 실행하는 역할을 합니다. /work_result endpoint는 태스크 실행 결과를 확인하는 역할입니다.

celery_app.py

from celery import Celery

celery_task = Celery(
    'app',
    broker="redis://127.0.0.1:6379/0",
    backend="redis://127.0.0.1:6379/0",
    include=['celery_worker']
)

celery app을 정의하는 모듈 파일입니다. broker와 backend 설정 모두 redis를 사용하도록 했습니다. include 부분의 celery_worker는 아래 woker 모듈을 가리키는 것 입니다.

celery_worker.py

from celery_app import celery_task

@celery_task.task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y

celery_worker 모듈에는 celery에서 동작할 task에 fuction에 대한 정의가 담겨 있습니다. 짧은 코드들임에도 파일을 나누어 둔 이유는 프로젝트가 커지면 대체로 모듈화 하여 파일 별로 분리를 해놓기 때문에 예제에서도 그렇게 분리를 해놨습니다. divide 함수는 단순한 나누기 함수인데 중간에 sleep(5)을 하여 비동기 태스크로 동작하게 만들었습니다.

이제 FastAPI App을 실행해보겠습니다.

uvicorn main:app

실행 후 127.0.0.1:8000/docs 로 접속합니다.

위 링크로 접속하면 이미지와 같은 swagger UI를 볼 수 있습니다. /, /work, /work_result endpoint들이 보입니다.

지금 이 상태로는 실제 celery task가 동작하지 않습니다. celery 프로세스도 별도로 띄워주어야 정상적으로 broker와 통신을 해서 celery task가 동작합니다. FastAPI 프로세스는 그대로 두고 새로운 터미널에서 아래와 같이 celery 프로세스를 동작시킵니다.

celery -A celery_app worker --loglevel=info

celery가 정상 동작하면 대략 아래와 같은 메시지를 출력하고 동작 합니다.

$ celery -A celery_app worker --loglevel=info
 
 -------------- celery@ip-172-17-0-147 v5.2.3 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-1065-aws-x86_64-with-debian-buster-sid 2022-02-09 07:38:39
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x7f99be0e9ed0
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery_worker.divide

[2022-02-09 07:38:39,850: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2022-02-09 07:38:39,856: INFO/MainProcess] mingle: searching for neighbors
[2022-02-09 07:38:40,866: INFO/MainProcess] mingle: all alone
[2022-02-09 07:38:40,877: INFO/MainProcess] celery@ip-172-17-0-147 ready.

이제 swagger UI를 이용해 api를 실행해보겠습니다. 위와 같이 입력하고 Execute를 합니다.

결과는 위와 같은 모습입니다. 왼쪽 터미널이 FastAPI를 실행한 것이고 오른쪽 터미널이 Celery 프로세스를 실행한 것입니다. Request가 정상적으로 들어왔고 Task도 실행 된 것을 볼 수 있습니다.

이번엔 work_result endpoint로 결과 값을 확인해보겠습니다.

앞서 봤던 결과가 message로 나오는 것을 볼 수 있습니다.

전체적인 사용 방법이 Flask에서와 특별히 다른 게 없습니다. Celery 프로세스 자체가 별도로 동작하므로 FastAPI나 Flask나 Celery 라이브러리를 가져다 동작시키는 구조입니다.

작성한 소스코드는 https://github.com/warnus/fastapi-celery-example 에도 있습니다.

Leave a Reply