2014년 9월 16일 화요일

First Steps with Celery

셀러리를 설치한지도 한달은 된것 같은데 이제야 셀러리의 첫단계를 완료하다니...

머 암튼...삽질을 했으면 기록을 남겨야 하니까..

일단 출처


환경

윈도우환경으로 Broker로는 RabbitMq 를 사용합니다.

시작하겠습니다.

분산컴퓨팅에서는 다음과 같은 구조가 가장 기본입니다.

사진출처 : Rabbit MQ

P : producer
C : consumer

Producer가 메시지를 전송하면 중간에 Broker(Hello)가 메시지를 Consumer에서 전달하는 구조입니다. 

일반적으로 생각하면 Producer는 클라이언트가 되어 요구를 전달하면 중간자가 그 메시지를 적당한 서버에 전달하는 구조인거죠.. 같은 말을 두번 하는거죠? ㅎㅎ

Application

첫번째로 우리는 Celery instance가 필요합니다. 이것은 Celery application 이라고도 불리고 "app"이라고 짧게 줄여 쓸기도 합니다.

tasks.py 라는 파일을 생성하죠

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y
Celery는 broker를 RabbitMQ 또는 Redis를 사용합니다. 원하시는 Broker를 선택하시면 되는데 기본적으로 해당 문서는 RabbitMQ 기반으로 작성되어있습니다.
해당 모듈은 add 라는 두개의 숫자를 더하는 하나의 함수만 존재합니다.
Running the celery woker server
tasks.py를 만든 위치에서 cmd 창에서 다음과 같은 명령어를 입력합니다.
$ celery -A tasks worker --loglevel=info
사실 이보다 먼저 행해져야 하는것이 Broker의 실행입니다. 이문서는 RabbitMQ를 기준으로 기술하고있습니다. RabbitMQ의 실행화면

celery 실행화면

Calling the task
task를 호출하기 위해서는 delay() 메소드를 사용합니다.
>>> from tasks import add
>>> add.delay(4, 4)
delay() 메소드의 return은 AsyncResult instance 이다. 이말은 단순히 4+4의 결과물인 8 만을 반환하지는 않는다는 말이다. 다음에서 더 자세히 보자
Keeping Results
tutorial을 하는데 왜 한두달이 걸렸냐고 묻는다면 바로 이 부분에서 막혀서 한동안 진전이 없어서였다고 말하겠다. 위에서 봤지만 delay() 메소드의 return은 AsyncResult instance 이다. 다시 말해서 이는 결과값도 비동기적으로 리턴할수 있는 구조체이다. 그렇다면 클라이언트가 메시지를 던지고 브로커가 워커에게 할당을 하는 과정이 메시지를 던지는 과정이라면 워커가 결과물을 브로커에게 던지고 브로커는 클라이언트에게 다시 던지는 과정이 결과물을 받는 과정이겠다. 여기서 중요한건 브로커가 비동기적 리턴을 들고 있어야 한다는 사실이다. 이러한 결과물 저장소를 backend라고 하고 celery에서는 backend를 SQLAlchemy/Django ORM, Memcashed, Redis, AMQP(RabbitMQ) 그리고 MongoDB 그외 당신이 원하는것으로... 하지만 우리는 Rabbit MQ를 사용하겠습니다.
그렇게 하기 위해서 위에서 작성한 tasks.py 파일의 일부분을 수정합니다.
app = Celery('tasks', backend='amqp', broker='amqp://')
>>> result = add.delay(4, 4)
ready()라는 함수는 프로세싱이 끝이 났는지 아닌지를 알려주는 함수입니다.
>>> result.ready()
False
get()이라는 함수를 통해서 원하는 8 이라는 리턴을 가져올수 있습니다.
>>> result.get(timeout=1)
8
여기가 막히는 부분이었는데
보시는 바와 같이 TimeoutError : the operation timed out.
에러 메시지를 뱉어낸다.. 아우... 약올라...

이 에러메시지를 해결하는 방법은 아래와 같다.

Configuration

해당 에러메시지를 이용하여 구글링을 했을때 항상 나오는 말이 설정에서 backend를 amqp로 설정하고 task result expires를 적당한 숫자로 입력하라고 한다. 
CELERY_RESULT_BACKEND = "amqp"
CELERY_TASK_RESULT_EXPIRES = 1000
그래서 실제로 설정부분에서 시키는 대로 celeryconfig.py 파일을 만들었다.
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_RESULT_EXPIRES = 18000
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Oslo'
CELERY_ENABLE_UTC = True
원문에서 해당 파일을 만들어서 환경변수 PATH 에 놓거나 current directory 에 저장하라고 했는데 두가지 방법을 사용해 보았을때 current directory 에 넣고해야 하는것 같다. path에 넣었을때는 사실 인식을 못해서 반복해서 에러를 뱉었던것 같다.
 그리고 tasks.py 파일의 일부분을 수정한다.
app.config_from_object('celeryconfig')
그리고 다시 실행한 모습니다.



참고로 amqp에 결과물이 저장되는 형태는 pickle, json, msgpack, yaml등을 사용할수 있으니 편한것을 설치하고 설정하면 된다.

우왕.. 이제 첫걸음을 떼다니...힘듬.