Skip to content

Instantly share code, notes, and snippets.

@diresi
Last active October 8, 2015 12:02
Show Gist options
  • Save diresi/ae2595b5355e48e9917c to your computer and use it in GitHub Desktop.
Save diresi/ae2595b5355e48e9917c to your computer and use it in GitHub Desktop.
celery multithreaded client
# start the worker with: `celery -A tasks worker`
import threading
import celery, celery.backends.rpc
from kombu.common import oid_from
broker = "amqp://guest:guest@172.17.0.3//"
backend = "tasks.RPC"
class RPC(celery.backends.rpc.RPCBackend):
@property
def oid(self):
return self.app.oid
@property
def binding(self):
return self.Queue(self.oid, self.exchange, self.oid,
durable=False, auto_delete=True)
tlocs = threading.local()
class CTApp(celery.Celery):
@property
def oid(self):
try:
return tlocs.oid
except AttributeError:
tlocs.oid = oid_from(self)
return tlocs.oid
def mk_app():
return CTApp("tasks", broker=broker, backend=backend)
app = mk_app()
@app.task
def echo(msg):
return "You said: {}".format(msg)
def current():
return str(threading.currentThread().name)
def func():
t = current()
r = echo.delay(t).get()
if r != echo(t):
print "FAILED:", repr((t, r))
assert False
print t, r
def funcfunc():
while True:
func()
def start(name):
t = threading.Thread(target=funcfunc, name=name)
t.start()
return t
if __name__ == "__main__":
for x in range(5):
start(chr(ord("A") + x))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment