Skip to content

Instantly share code, notes, and snippets.

@osmanmesutozcan
Forked from oozliuoo/Readme.md
Last active March 19, 2018 12:56
Show Gist options
  • Save osmanmesutozcan/67feac6408f115e8c27d9a65bd6f760b to your computer and use it in GitHub Desktop.
Save osmanmesutozcan/67feac6408f115e8c27d9a65bd6f760b to your computer and use it in GitHub Desktop.
CeleryPipelineDemo

Quick demo of implementing pipelines via Celery

How to use

  1. Install Celery and Redis
  2. Start your redis locally
  3. Run three workers with name worker1@localhost, worker2@localhost and worker3@localhost
  4. Run the main.py

Improvements made to the original version

  • Added ability to make sure CPU bound tasks run only once at a time even if there are multiple workers running in the instance.
{
"BROKER": "redis://localhost:6379/0",
"BACKEND": "redis://localhost:6379/1",
"TASK_CREATE_MISSING_QUEUE": true
}
import os
import json
import time
from tasks import add, minus, mult
from kombu import Queue
from celery import Celery, chain
CONFIG = json.load(open('./config.json'))
BROKER = CONFIG["BROKER"]
BACKEND = CONFIG["BACKEND"]
TASK_CREATE_MISSING_QUEUE = CONFIG["TASK_CREATE_MISSING_QUEUE"]
WORKERS = {
"WORKER1": "worker1@localhost",
"WORKER2": "worker2@localhost",
}
HASHES = {
"HASH1": "feature_hash1",
"HASH2": "feature_hash2",
}
class CeleryApp:
@staticmethod
def connect_workers():
'''Setup celery object and connect to broker.
Returns:
Celery instance
'''
return Celery(
'worker.tasks',
broker=BROKER,
backend=BACKEND,
task_create_missing_queues=TASK_CREATE_MISSING_QUEUE,
task_queues=()
)
class MainApp:
def __init__(self, celery_app):
'''
this class acccept a celery_app object as a decorate pattern,
which will be mainly used to handle jobs
@param {CeleryApp} celery_app - the CeleryApp object decorating this class
'''
self._celery_app = celery_app
if self._celery_app.conf.task_queues is None:
self._celery_app.conf.task_queues = ()
def do_job1(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing (2 + 2) * 3 - 6 == 6, in the first queue via WORKER1
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER1"])
# print("job1: %s" % self._celery_app.control.inspect().active())
job = chain(
add.s(2, 2).set(queue=feature_hash, routing_key=feature_hash),
mult.s(3).set(queue=feature_hash, routing_key=feature_hash),
minus.s(6).set(queue=feature_hash, routing_key=feature_hash)
)
job.delay()
def do_job2(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing 2 * 5 * 3 - 10 + 18 == 38, in the second queue via WORKER2
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER1"])
# print("job2: %s" % self._celery_app.control.inspect().active())
job = chain(
mult.s(2, 5).set(queue=feature_hash, routing_key=feature_hash),
mult.s(3).set(queue=feature_hash, routing_key=feature_hash),
minus.s(10).set(queue=feature_hash, routing_key=feature_hash),
add.s(18).set(queue=feature_hash, routing_key=feature_hash),
)
job.delay()
def do_job3(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing (2 * 4 - 10) * 7 == -14, in the third queue via WORKER3
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER3"])
# print("job3: %s" % self._celery_app.control.inspect().active())
job = chain(
mult.s(2, 4).set(queue=feature_hash, routing_key=feature_hash), # cpu 10 mins
)
job.delay()
def do_job4(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing (2 * 4 - 10) * 7 == -14, but in the first queue via WORKER2
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER2"])
# print("job4: %s" % self._celery_app.control.inspect().active())
job = chain(
mult.s(2, 4).set(queue=feature_hash, routing_key=feature_hash),
minus.s(10).set(queue=feature_hash, routing_key=feature_hash),
mult.s(7).set(queue=feature_hash, routing_key=feature_hash)
)
job.delay()
def _configure_routing(self, feature_hash, worker):
'''
Configures routing at runtime, basically setting up new queues
and assign a worker to that queue
@param {string} feature_hash - the feature hash that representing a series of tasks
@param {string} worker - name (host) of worker that will be consuming the queue created
'''
self._celery_app.conf.task_queues += (
Queue(feature_hash, routing_key=feature_hash),
)
self._celery_app.control.add_consumer(
feature_hash,
reply=True,
routing_key=feature_hash,
destination=[worker]
)
celery_app = CeleryApp.connect_workers()
main_app = MainApp(celery_app)
main_app.do_job1(HASHES["HASH1"])
main_app.do_job2(HASHES["HASH1"])
main_app.do_job4(HASHES["HASH1"])
# main_app.do_job3(HASHES["HASH3"])
import time
from celery import Celery
from celery import task
from util import execute_task_with_lock
app = Celery('worker.tasks', backend='redis://localhost:6379/1', broker='redis://localhost:6379/0')
@app.task(bind=True)
def add(self, x, y):
current_worker_hostname = self.request.hostname
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname]
return execute_task_with_lock(_add, "add", x, y)
def _add(x, y):
time.sleep(5)
return x + y
@app.task(bind=True)
def mult(self, x, y):
current_worker_hostname = self.request.hostname
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname]
return execute_task_with_lock(_mult, "mult", x, y)
def _mult(x, y):
time.sleep(10)
return x * y
@app.task(bind=True)
def minus(self, x, y):
current_worker_hostname = self.request.hostname
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname]
return execute_task_with_lock(_minus, "minus", x, y)
def _minus(x, y):
time.sleep(15)
return x - y
import time
from multiprocessing import Manager
from contextlib import contextmanager
manager = Manager()
proc_lock = manager.Lock()
proc_dict = manager.dict()
class LockTimeout(Exception):
pass
class TaskAlreadyRunning(Exception):
'''One of the workers already acquired
the lock for this task.
'''
pass
@contextmanager
def ProcLock(lock_id, timeout=180):
# once dictionary is added in shared dict
# it is frozen. You have to regen everytime
# you want to update the lock...
if proc_dict.get(lock_id) is None:
with proc_lock:
proc_dict[lock_id] = {
'last_seen': round(time.time()),
'has_been_acquired': False
}
# if timeout reset the lock and raise error
if round(time.time()) - proc_dict[lock_id]['last_seen'] > timeout:
with proc_lock: proc_dict[lock_id] = None
raise LockTimeout
if proc_dict[lock_id]['has_been_acquired']:
raise TaskAlreadyRunning
# acquire the lock.
with proc_lock:
proc_dict[lock_id] = {
'last_seen': round(time.time()),
'has_been_acquired': True
}
yield
# means the task is complete.
# we can release the lock.
with proc_lock:
proc_dict[lock_id] = {
'last_seen': round(time.time()),
'has_been_acquired': False
}
def execute_task_with_lock(proc, task_name, *args, timeout=180, **kwargs):
result = None
while True:
try:
with ProcLock(task_name, timeout=timeout):
result = proc(*args, **kwargs)
break
except TaskAlreadyRunning:
print("waiting for task (%s) to complete." % task_name)
time.sleep(1)
except LockTimeout:
print("lock timeout! Acquiring lock for task (%s)" % task_name)
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment