Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
[celery: Celery + Django] Celery - Job queuing framework for python. #python #django

Overview

django+Celeryによる非同期処理について

Celery

celery/celery - github.com - Python 向けジョブ管理フレームワークの定番
Celery - Distributed Task Queue - 公式ドキュメント

django-celery-beat

celery/django-celery-beat - github.com - Django x Celery による定時ジョブ管理
Celery 4 Periodic Task in Django

django-celery-results

celery/django-celery-results - github.com - Django x Celery によるスケジューリング
Django に Celery タスクキューを導入し、遅い処理を利用者に体感させないようにする
Djangoで非同期処理 - Celery, Redisを使って - Heroku デプロイ例
Celery + Redis + Django

$ pip install redis celery django-celery-results
# project/settings.py
INSTALLED_APPS = [
    # ...
    'django_celery_results',  # ←追加
]

CELERY_BROKER_URL = 'redis://username:password@hostname:port'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
$ python manage.py migrate django_celery_results
# project/celery.py
# @todo Rename 'project' to your project name.
# @see  http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
#
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
settings = os.getenv('DJANGO_SETTINGS_MODULE', 'project.settings')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', settings)

app = Celery('myproject')

# Using a string here means the worker does not have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(['myproject'])

# アプリ下にも tasks 作りたい場合は ... 
# app.autodiscover_tasks(['myproject', 'myapp'])
# myproject/tasks/__init__.py
#
from ..celery import app
# myapp/tasks/__init__.py なら from myproject.celery import app かな

# テスト用タスク add_numbers を作成
# タスクには @app.task() デコレータを必ず付与すること
#
@app.task()
def add_numbers(a, b):
    print('Request: {}'.format(a + b))
    """
    タスクの完了をトリガとして何かをキックしたければここ
    etc. メール送信 / Web API のコール / WebSocket ( Channels ) Push
    WebSocket へ送るなら room_group_name みたいなものを引数にうけ...
    
    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync

    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(chat_name, {
        'type': 'chat.system_message',
        'text': 'Process was succeeded!!',
    })
    """
    return a + b

@app.task()
def twice_up(a):
    return a * 2

この @app.task() デコレータには「例外時にリトライさせる」や「成功時/失敗時のコールバック」など様々なオプションが設定可能。

celery.app.task

Launch celery worker.

Celery に限らずだが、一般的に Worker は立ち上げたプロセスのメモリ上にタスクをロードするため 当然ながらファイル変更の検知とかしない

まずは開発時に Celery 関連のタスクを書き換えた際は Worker の再起動 を必ず行うようにする。そして更に 本番へ適用する際は必ず Worker の再起動をデプロイプロセスの中に組み込む こと。この際にオンメモリで抱えているタスクが死ぬため キューイングされている処理がすべて終わってから ... などデプロイは計画的に実施する 必要がある。

# manage.py のあるディレクトリで Celery ワーカーを立ち上げ
# celery バイナリ実体は /usr/local/bin/celery とかにいる
#
# 単一処理モード - 開発中に標準出力みたい場合
# 殺すときは乱暴だけど ps aux|grep 'celery worker' → kill で
#
$ celery -A myproject worker --concurrency=1 -l debug
#
#
# 並列処理 & デタッチモード - 本番環境や Docker で RUN する場合
# pid を実行ディレクトリ直下に生成する関係で Docker などのマウント配下に
# pid ファイルがくるとファイルロックがかかってしまう
# --pidfile オプションで pid ファイルをマウント領域外に作ってやるのが無難
# また pid ファイルが存在するとき削除実行 → 起動にコケるバグがある?ため
# pid ファイルを削除してやる https://github.com/celery/celery/issues/5409
#
$ mkdir -p /root/run/celery
$ rm -rf /root/run/celery/*
$ celery -A myproject worker --detach --pidfile="/root/run/celery/%n.pid"

Task queueing.

Calling Tasks - docs.celeryproject.org

$ python manage.py shell

# 先程実装した tasks モジュールからメソッドを import
from myproject.tasks import add_numbers

# 同期実行
add_numbers(3, 4)
> Request: 7
> Out: 7

# 非同期実行 
#
# パッケージ django_celery_results をインストールし INSTALLED_APPS に加え
# django_celery_results マイグレーションを実行し CELERY_RESULT_BACKEND を
# 'django-db' にした場合、非同期処理の状態/実行結果を DB の TaskResult モデル
# で管理することになり task_id をキーとして状態や結果にアクセスできるようになる
#
# また apply_async() には link=method() や link_error=error_handler() など
# 実行後のコールバック / 例外時のエラーハンドリングコールバックを渡せる
#
add_numbers.apply_async((5, 6), countdown=10, expires=300)
> <AsyncResult: deeddee2-74a6-48ec-9f22-987417135592>

# 引数 1 つならこういうタプルを渡す
# http://bit.ly/2LBxLHi
#
twice_up.apply_async((2,), countdown=10, expires=300)  

# django_celery_results.models モジュールより TaskResult モデルを import
#
from django_celery_results.models import TaskResult

tr = TaskResult.objects.get(task_id='deeddee2-74a6-48ec-9f22-987417135592')
tr.status     # SUCCESS
tr.result     # 11
tr.date_done  # datetime(...)

send_task()

https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.send_task
https://stackoverflow.com/questions/61096293/difference-between-send-task-and-apply-async

  • apply_async と同じく非同期実行のメソッド
  • module import せず @app.task(name='hoge') など name で照合して実行できる
  • 引数パターン、挙動は apply_async ちゃんと同じみたい

django で model 側から task kick したいケースで、task 側に model import 書いちゃうと循環参照になる。ので、こういう感じでエイリアスで call する手段が提供されてるっぽい。

# https://docs.celeryproject.org/en/2.0-archived/userguide/executing.html#basics  
# タスクのクラスにアクセスできない場合は、send_task()を使用して名前でタスクを実行することもできます。

from myapp.celery import app

@app.task(name='add')
def add(a, b):
    return a + b
  
"""
↑ みたいなタスクがあったとして ...
"""

import celery

async_res = celery.current_app.send_task('add', args=[1, 1], countdown=10)
# <AsyncResult: 88e81865-870a-4973-8cf7-2bee8e2ec8a8>

async_res.id     # '88e81865-870a-4973-8cf7-2bee8e2ec8a8'
async_res.get()  # 10 秒後に task 実行され 2 が返る

ETA

  • countdown で秒数指定する代わりに datetime 渡すこともできる
  • こっちのが DB に estimated_at みたいなカラム入れたりして管理しやすいかな
import celery
from django.utils import timezone

# django なら timezone 使ってこんな感じかな
tomorrow = timezone.localtime() + timezone.timedelta(days=1)

# 明日実行されるやつ予約
celery.current_app.send_task('add', args=[1, 1], eta=tomorrow)

current_task

Check if in celery task

from myapp.celery import app

@app.task
def notify():
    notification()

def notification():
    from celery import current_task

    if not current_task:
        print '直接 call した'

    elif current_task.request.id is None:
        print 'celery (task) 経由だが同期 call した'
    else:
        print '非同期 task として call した'

Revoke task.

Task 取り消し : Revocation

from app.tasks import note
# [deprecated v5~] from celery.task.control import revoke
import celery
from celery.app.control import Control

t = note.apply_async(args=('do it later',), countdown=100)
Control(celery.current_app).revoke(t.id, terminate=True)
# t.revoke() でも OK です

t.state  # PENDING → 暫くすると REVOKED に ...

Auto-Reload on file change.

Autoreload doesn't reload if modules imported by the tasks module change
Django and Celery - re-loading code into Celery after a change
Celery auto reload on ANY changes
How To Auto Reload Celery Workers In Development? - watchdog 併用例
Auto-reload Celery on code changes - watchdog 併用例

--autoreload オプションは Celery 4.0 以降非推奨扱いになっている。純粋に解決リソースがないので開発が追い付いてないみたい。開発環境に限っては watchdog を併用したり、 Django にタスクとして実行させ、ファイル変更を検知して再起動をかけるやりかたもある。

時間のあるときに検証して導入してみたい。

Management of shutting down.

通常 Celery などを利用してバックグラウンドで行うような処理は 24 時間 365 日サービスが稼働している環境を理想とすることが多い。しかし どのようなプラットフォームであれ、100 % の可用性は実現できないため、何らかの「システムダウン対策」をとる必要がある 。対策方法は要件やプラットフォーム制約によるが、一般的に以下のようなものが考えられる。

  • ジョブ管理のバックエンドに Redis などを利用しデータを永続化
    • これだけでは不十分だが、インメモリの場合全てが消し飛んでしまうのでまずは必須
  • タスクの実装について「処理途中で失敗する可能性」を考慮
    • 異常終了時を検知しロールバックを実行する
    • 後から再キックできるよう冪等性を保つ
  • タスクの異常終了時のふるまいを要件に応じて設計
    • 殆どのケースで「異常検知で即落とし & ロールバック & 異常終了結果の DB などへの永続化」を行い「後から当該ジョブ ( バッチ ) が異常終了したことを検知」できて「手動で再実行」できる動線を確保するのが安牌
    • 上記に加えて「リトライ可能な状況では再実行する」仕組みを実装するのがベター
  • Celery の acks_late にてデキュー方式を「完了後にデキュー」に変更
    • Should I use retry or acks late?
    • Celery はデフォルトで Redis などのブローカーからキューを受け取る際に Acknowledge ( = 認める / 確認 ) としてブローカーのキューを削除する
      • Celery から見たらタスクが冪等で二重実行可能かどうか不明なためデフォルトでは「実行しようとしたときにキューを消す」ことにしてるみたい
    • Acknowledge を 遅延確認 にすることで キュー削除をタスク実行後に行う よう強制し、タスク実行直前にデキュー → プロセス終了でタスクをロストすることを防ぐ
  • Celery の reject_on_worker_lost でワーカー不在時時に再キューイングするように変更
    • 上記 acks_late と組み合わせて、何らかの原因で処理途中にプロセスが死んでも、ブローカーにキューがストックされた状態になるためプロセス復帰後にタスクが再実行されるようになる
    • 但し、上記はタスクが二重に走ることになるため処理は必ず冪等なもの = 二重実行を許容できるものにしなければならない
  • REMAP_SIGTERM 機能でキューの取りこぼしを抑止
  • Celery のタスクセット時にリトライポリシーを設定する
# Celery は「タスクの実行直前にブローカーからタスクをデキュー」する
# ( ブローカー = Redis など Celery のキューイングバックエンド )
# これにより「デキュー後 Celery が何らかの原因で異常終了」した場合
# Celery プロセス復旧後も当該のキューをリトライしない問題が発生する
# これは当該タスクに冪等性があるか?再実行可能なタスクか?が不明なため
#
# 以下オプションで「タスクの完了後にデキュー」するように明示できる
# これにより未完了なタスクはブローカーにキューが残ったままとなり
# 結果 Celery プロセスの復旧後に ( 有効期限内なら ) リトライできる
# 
# http://docs.celeryproject.org/en/master/reference/celery.app.task.html#celery.app.task.Task.acks_late
# http://docs.celeryproject.org/en/master/reference/celery.app.task.html#celery.app.task.Task.reject_on_worker_lost
#
@app.task(acks_late=True, reject_on_worker_lost=True)
def my_task(a):
    pass
# Docker で web / worker コンテナを構成しテストする例
#
$ docker-compose up

# ここでアプリからタスクを apply_async などでキューイング
# 時間が来る前に Celery プロセスに SIGTERM を送信し終了させる
#
$ docker-compose exec worker bash
$ ps aux | grep 'celery worker'
$ kill <process-id>

# Docker Compose のコンソールにて worker コンテナの標準出力を確認
# また、時間になってもタスクが実行されないことを確認
#
# ===
# worker: Warm shutdown (MainProcess)
# worker_1 ... Restoring 1 unacknowledged message(s)
# ===

# Celery プロセス ( worker コンテナ ) の再起動を行い
# 起動時に「キューを受け取った」ログがでるので確認
# 実際にキューがリストアされ処理が実行されることを確認
#
$ docker-compose start worker
# Heroku 環境変数はダッシュボードからセット
#
$ export REMAP_SIGTERM=SIGQUIT

Signals

Signals
Working with celery signals

  • task の起動などイベントに対して func 仕込める
  • signal の種類は task_success とか task_revoked とか

signal の callback は第一引数が sender 固定で、第二以降は task_postrun とか task_success とかで取る引数がバラバラ。なので **kwargs で受けて欲しいやつ取ったほうがバージョンアップとか対応できていいよ。って書いてあった。

引数が一貫性なさすぎてムカつくっていう issue
https://github.com/celery/celery/issues/1555

# django なら settings.py で定数設定いるかも
CELERY_SEND_EVENTS = True
import celery
from celery import Celery
from celery.worker.request import Context
from celery.signals import task_prerun, task_revoked

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@task_prerun.connect(sender=add)
def prerun_callback(sender=None, **kwargs):
    # prerun の keyword args に task_id があるのでそっから取る
    print(f'task id {kwargs['task_id']}')

@task_success.connect(sender=add)
def success_callback(sender=None, **kwargs):
    # 殆どの callback では current_task から request 辿れるので
    # そっちから task id とってもいい
    print(f'task id {celery.current_task.request.id}')

@task_revoked.connect(sender=add, **kwargs):
    # revoked は current_task がすでに死んでるので
    # こっちから取らないとあかんらしい
    request_context: Context = kwargs['request']
    print(f'task id {request_context.id}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment