Last active
October 8, 2015 12:02
-
-
Save diresi/ae2595b5355e48e9917c to your computer and use it in GitHub Desktop.
celery multithreaded client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# start the worker with: `celery -A tasks worker` | |
import threading | |
import celery, celery.backends.rpc | |
from kombu.common import oid_from | |
broker = "amqp://guest:guest@172.17.0.3//" | |
backend = "tasks.RPC" | |
class RPC(celery.backends.rpc.RPCBackend): | |
@property | |
def oid(self): | |
return self.app.oid | |
@property | |
def binding(self): | |
return self.Queue(self.oid, self.exchange, self.oid, | |
durable=False, auto_delete=True) | |
tlocs = threading.local() | |
class CTApp(celery.Celery): | |
@property | |
def oid(self): | |
try: | |
return tlocs.oid | |
except AttributeError: | |
tlocs.oid = oid_from(self) | |
return tlocs.oid | |
def mk_app(): | |
return CTApp("tasks", broker=broker, backend=backend) | |
app = mk_app() | |
@app.task | |
def echo(msg): | |
return "You said: {}".format(msg) | |
def current(): | |
return str(threading.currentThread().name) | |
def func(): | |
t = current() | |
r = echo.delay(t).get() | |
if r != echo(t): | |
print "FAILED:", repr((t, r)) | |
assert False | |
print t, r | |
def funcfunc(): | |
while True: | |
func() | |
def start(name): | |
t = threading.Thread(target=funcfunc, name=name) | |
t.start() | |
return t | |
if __name__ == "__main__": | |
for x in range(5): | |
start(chr(ord("A") + x)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment