Celery로 해보는 비동기 분산처리 기초

2020년 6월 22일 수정

이 글은 PythonCelery를 이용해 비동기 분산처리 방법을 아주 간단히 정리한다.

설치

Celery는 보통의 패키지처럼 pip로 그냥 설치하면 된다.

pip install celery

브로커(Broker) 준비

셀러리는 태스크의 요청과 응답을 중간에서 도와주는 브로커가 필요하다. RabbitMQ나 Redis 등 다양한 브로커가 있으니 지원되는 것을 미리 준비해놓자. 이 글에서는 redis가 redis 라는 호스트 이름의 6379 포트에서 돌아가고 있다고 가정한다.

태스크 작성

우선 당연하겠지만 모듈을 import 해야한다.

from celery import Celery

이제 셀러리 인스턴스를 만들어서 브로커 세팅을 해야한다.

app = Celery('mytask',
             broker='redis://redis:6379/0')

이제 남은 일은 태스크를 코딩하는 일이다. 아래의 코드를 추가로 구현한다.

@app.task
def print_task(message):
    print(f'print_task: {message}')

위의 코드들이 순서대로 app.py 라는 파일에 몽땅 작성되었다고 가정하자.

워커(worker) 띄우기

아래 명령으로 워커를 띄울 수 있다.

celery -A app worker

백그라운드 데몬이 아니라 일반 프로세스처럼 돌아가는데, 요즘은 Docker로 돌리는 경우도 많기 때문에 굳이 백그라운드 데몬으로 돌릴 이유는 없을 것 같기도 하다.

태스크(task) 호출하기

이제 원격으로 태스크를 호출해보자. 아래는 Python 인터프리터로 호출해 보는 예제다.

>>> from app import print_task
>>> print_task.delay('Hello, World!')
<AsyncResult: ...>

이렇게 하면 현재 떠있는 워커(worker)에서 해당 태스크가 실행되는 것을 볼 수 있다. 즉 워커 로그에 아래와 같은 로그가 표시되면 성공이다.

print_task: Hello, World!

결과 받아보기

위의 내용은 단순 프로시져를 구동하기엔 나쁘지 않지만, 일반 함수들 같은 경우 반환되는 결과를 받아야만 한다. 이런 반환값을 받기 위해서는 우선 백엔드(backend) 설정이 추가로 필요하다.

여러 방법이 있겠지만 여기서는 브로커로 사용하는 redis를 재활용해보자.

app = Celery('mytask',
             broker='redis://redis:6379/0',
             backend='redis://redis:6379/0')

app 인스턴스를 위처럼 생성하도록 수정했다.

이제 아래와 같은 태스크를 추가로 구현해보자.

@app.task
def add(a, b):
    return a + b

단순하지만 목적이 완벽한 코드다.

add 태스크를 원격으로 호출해보자. 역시 Python 인터프리터로 호출한 예다.

>>> from app import add
>>> r = add.delay(1, 2)
>>> r.get(timeout=10)
3

AsyncResult 타입에서 제공하는 get 메서드를 이용하면 응답을 받을 수 있다. 물론 타임아웃이 발생한다면 워커가 내려갔거나 브로커나 백엔드에 문제가 생겼을 것이다.

마무리

이 글은 정말 기초만 겉핥기 하는 수준의 글이다. Celery 자체가 태스크 큐 관리 정책 등 다양한 분산처리를 위한 관리 기능을 제공하기 때문에 이 외에 많은 부분을 알아야 할 것 같다. 다행히도 제법 많이 쓰이는 도구라 인터넷에서 많은 정보를 찾을 수 있으니 참고해보자.