Skip to content

Instantly share code, notes, and snippets.

@oozliuoo
Last active December 19, 2017 02:15
Show Gist options
  • Save oozliuoo/97edef57b6987b94fe5c10800960a54c to your computer and use it in GitHub Desktop.
Save oozliuoo/97edef57b6987b94fe5c10800960a54c to your computer and use it in GitHub Desktop.
CeleryPipelineDemo

Quick demo of implementing pipelines via Celery

How to use

  1. Install Celery and Redis
  2. Start your redis locally
  3. Run three workers with name worker1@localhost, worker2@localhost and worker3@localhost
  4. Run the main.py
{
"BROKER": "redis://localhost:6379/0",
"BACKEND": "redis://localhost:6379/1",
"TASK_CREATE_MISSING_QUEUE": true
}
import os
import json
import time
from tasks import add, minus, mult
from kombu import Queue
from celery import Celery, chain
CONFIG = json.load(open('./config.json'))
BROKER = CONFIG["BROKER"]
BACKEND = CONFIG["BACKEND"]
TASK_CREATE_MISSING_QUEUE = CONFIG["TASK_CREATE_MISSING_QUEUE"]
WORKERS = {
"WORKER1": "worker1@localhost",
"WORKER2": "worker2@localhost",
"WORKER3": "worker3@localhost",
}
HASHES = {
"HASH1": "feature_hash1",
"HASH2": "feature_hash2",
"HASH3": "feature_hash3"
}
class CeleryApp:
@staticmethod
def connect_workers():
'''Setup celery object and connect to broker.
Returns:
Celery instance
'''
return Celery(
'worker.tasks',
broker=BROKER,
backend=BACKEND,
task_create_missing_queues=TASK_CREATE_MISSING_QUEUE,
task_queues=()
)
class MainApp:
def __init__(self, celery_app):
'''
this class acccept a celery_app object as a decorate pattern,
which will be mainly used to handle jobs
@param {CeleryApp} celery_app - the CeleryApp object decorating this class
'''
self._celery_app = celery_app
def do_job1(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing (2 + 2) * 3 - 6 == 6, in the first queue via WORKER1
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER1"])
# print("job1: %s" % self._celery_app.control.inspect().active())
job = chain(
add.s(2, 2).set(queue=feature_hash, routing_key=feature_hash),
mult.s(3).set(queue=feature_hash, routing_key=feature_hash),
minus.s(6).set(queue=feature_hash, routing_key=feature_hash)
)
job.delay()
def do_job2(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing 2 * 5 * 3 - 10 + 18 == 38, in the second queue via WORKER2
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER1"])
# print("job2: %s" % self._celery_app.control.inspect().active())
job = chain(
mult.s(2, 5).set(queue=feature_hash, routing_key=feature_hash),
mult.s(3).set(queue=feature_hash, routing_key=feature_hash),
minus.s(10).set(queue=feature_hash, routing_key=feature_hash),
add.s(18).set(queue=feature_hash, routing_key=feature_hash),
)
job.delay()
def do_job3(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing (2 * 4 - 10) * 7 == -14, in the third queue via WORKER3
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER3"])
# print("job3: %s" % self._celery_app.control.inspect().active())
job = chain(
mult.s(2, 4).set(queue=feature_hash, routing_key=feature_hash), # cpu 10 mins
)
job.delay()
def do_job4(self, feature_hash):
'''
function that mocks doing a specific job, it will pick
up a worker based on some policy, here just hardcoding
the policy
this job is doing (2 * 4 - 10) * 7 == -14, but in the first queue via WORKER2
@param {string} feature_hash - the feature hash that representing a series of tasks
'''
self._configure_routing(feature_hash, WORKERS["WORKER2"])
# print("job4: %s" % self._celery_app.control.inspect().active())
job = chain(
mult.s(2, 4).set(queue=feature_hash, routing_key=feature_hash),
minus.s(10).set(queue=feature_hash, routing_key=feature_hash),
mult.s(7).set(queue=feature_hash, routing_key=feature_hash)
)
job.delay()
def _configure_routing(self, feature_hash, worker):
'''
Configures routing at runtime, basically setting up new queues
and assign a worker to that queue
@param {string} feature_hash - the feature hash that representing a series of tasks
@param {string} worker - name (host) of worker that will be consuming the queue created
'''
if self._celery_app.conf.task_queues is None:
self._celery_app.conf.task_queues = (
Queue(feature_hash, routing_key=feature_hash),
)
else:
self._celery_app.conf.task_queues += (
Queue(feature_hash, routing_key=feature_hash),
)
self._celery_app.control.add_consumer(
feature_hash,
reply=True,
routing_key=feature_hash,
destination=[worker]
)
celery_app = CeleryApp.connect_workers()
main_app = MainApp(celery_app)
main_app.do_job1(HASHES["HASH1"])
main_app.do_job2(HASHES["HASH1"])
# main_app.do_job3(HASHES["HASH3"])
# main_app.do_job4(HASHES["HASH1"])
import time
import redis
from redlock import RedLock
from celery import Celery
from celery import task
app = Celery('worker.tasks', backend='redis://localhost:6379/1', broker='redis://localhost:6379/0')
ADD_LOCK_KEY = "add_lock"
MULT_LOCK_KEY = "mult_lock"
MINUS_LOCK_KEY = "minus_lock"
ADD_REDIS_KEY_SUFFIX = ".tasks.add"
MULT_REDIS_KEY_SUFFIX = ".tasks.mult"
MINUS_REDIS_KEY_SUFFIX = ".tasks.minus"
@app.task(bind=True)
def add(self, x, y):
"""
Executing add job with redlock
Decorators:
app
Arguments:
x {int} -- First param to be added
y {int} -- Second param to be added
Returns:
int -- sum of the two params
"""
current_worker_hostname = self.request.hostname
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname]
return execute_task_with_lock(_add, "add", ADD_LOCK_KEY, x, y)
def _add(x, y):
"""The acttual function doing add
Sleep for 5 secs, and then add up the two params
Arguments:
x {int} -- First param to be added
y {int} -- Second param to be added
Returns:
int -- sum of the two params
"""
time.sleep(5)
return x + y
@app.task(bind=True)
def mult(self, x, y):
"""
Executing mult job with redlock
Decorators:
app
Arguments:
x {int} -- First param to be multed
y {int} -- Second param to be multed
Returns:
int -- mult of the two params
"""
current_worker_hostname = self.request.hostname
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname]
return execute_task_with_lock(_mult, "mult", MULT_LOCK_KEY, x, y)
def _mult(x, y):
"""The acttual function doing mult
Sleep for 10 secs, and then multiply the two params
Arguments:
x {int} -- First param to be multed
y {int} -- Second param to be multed
Returns:
int -- mult of the two params
"""
time.sleep(10)
return x * y
@app.task(bind=True)
def minus(self, x, y):
"""
Executing minus job with redlock
Decorators:
app
Arguments:
x {int} -- First param to be minused from
y {int} -- Second param to be used to minus
Returns:
int -- diff of the two params
"""
current_worker_hostname = self.request.hostname
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname]
return execute_task_with_lock(_minus, "minus", MINUS_LOCK_KEY, x, y)
def _minus(x, y):
"""The acttual function doing minus
Sleep for 15 secs, and then minus the two params
Arguments:
x {int} -- First param to be minus from
y {int} -- Second param to be used to minus
Returns:
int -- difference of the two params
"""
time.sleep(15)
return x - y
def execute_task_with_lock(proc, task_name, lock_key, x, y):
"""Executing tasks with lock
Executing the task, but only one at a time, meaning
if there are the same tasks being processed, the incoming
one will be stalled
Arguments:
proc {Function} -- Actual function performing the task
task_name {string} -- Name of the task
lock_key {string} -- Lock key for the task
x {int} -- First argument of the task
y {int} -- Second argument of the task
Returns:
int -- result of the task
"""
result = -1
has_no_lock = True
while has_no_lock:
try:
with RedLock(lock_key):
result = proc(x, y)
has_no_lock = False
except:
print("waiting for the previous %s to complete, wait for 1 sec" % task_name)
time.sleep(1)
has_no_lock = True
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment