Skip to content

Instantly share code, notes, and snippets.

@ficapy
Last active December 12, 2016 06:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ficapy/5e1930b410a774adc2a14c8554ec5a90 to your computer and use it in GitHub Desktop.
Save ficapy/5e1930b410a774adc2a14c8554ec5a90 to your computer and use it in GitHub Desktop.
celery+sqlalchemy architecture demo
  1. 启动celery

    celery -A base_celery worker --loglevel=info 
    
  2. 关闭celery(所有任务将丢失不会重启)

    pkill -9 -f 'celery worker'
    
  3. 调用任务

    python call.py
    
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
import celeryconfig
from celery import Celery
from celery.signals import worker_init
from sqlalchemy import create_engine
from model import db_session, sqlalchemy_db
app = Celery('Demo')
app.config_from_object(celeryconfig)
@worker_init.connect
def initialize_session(**kwargs):
some_engine = create_engine(sqlalchemy_db)
db_session.configure(bind=some_engine)
#!/usr/bin/env python
#-*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
task1_list = []
from task1 import add1
from model import db_session,Demo
for i in range(10000):
task1_list.append(add1.delay())
while True:
print(db_session.query(Demo).count())
import time
time.sleep(1)
print([i.status for i in task1_list])
print(sum([i.status != 'SUCCESS' for i in task1_list]))
if all([i.status == 'SUCCESS' for i in task1_list]):
break
#!/usr/bin/env python
#-*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
# http://docs.celeryproject.org/en/latest/userguide/configuration.html
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/10'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYD_CONCURRENCY = 10
CELERYD_MAX_TASKS_PER_CHILD = 100 # 执行100个任务后自动退出
BROKER_CONNECTION_MAX_RETRIES = 5
CELERY_IMPORTS = (
'task1',
)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
from sqlalchemy import create_engine, Column, BIGINT
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
sqlalchemy_db = 'postgresql+psycopg2://user:pwd@127.0.0.1/db_name'
Base = declarative_base()
class Demo(Base):
__tablename__ = 'demo'
id = Column(BIGINT, primary_key=True)
task = Column(BIGINT)
uline_engine = create_engine(sqlalchemy_db, pool_recycle=3600, echo=False)
Base.metadata.bind = uline_engine
Base.metadata.create_all(checkfirst=True)
db_session = scoped_session(sessionmaker(bind=uline_engine))
from base_celery import app
from model import db_session as session, Demo
from random import randint
@app.task(default_retry_delay=1, max_retries=1)
def add1():
try:
session.add(Demo(id=randint(1, 100000000000000), task=1))
session.commit()
except Exception as e:
raise add1.retry(exc=e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment