Skip to content

Instantly share code, notes, and snippets.

@odra
Last active November 7, 2019 23:03
Show Gist options
  • Save odra/52d96fbf5abf2693ce5bd5737509e172 to your computer and use it in GitHub Desktop.
Save odra/52d96fbf5abf2693ce5bd5737509e172 to your computer and use it in GitHub Desktop.
celery stuff
import time
from celery import group
import sources as app
def wait_fn(fn):
is_ready = fn.ready()
while not is_ready:
print('Waiting for function')
is_ready = fn.ready()
time.sleep(1)
print(fn.get())
def pfn(*args):
return group(app.greet.s(arg) for arg in args)().get()
def get_data():
return group(app.query.s(None, 'title'), app.query.s(None, 'description'), app.query.s(None, 'link'))().get()
wait_fn(app.greet.delay('me'))
title, desc, link = get_data()
print(f'{title}\n{desc}\n{link}')
import time
import inspect
from celery import Celery
from celery.bin import worker
class Manager(object):
def __init__(self, name, **kwargs):
self.includes = []
self.name = name
self.q = Celery('worker', **kwargs)
def task(self, *args, **kwargs):
fn = args[0]
m = inspect.getsourcefile(fn)
path = m.replace('.py', '').replace('/', '.')
if not path in self.includes:
self.includes.append(path)
return self.q.task(*args, **kwargs)
def run(self):
self.q.conf.include = self.includes
w = worker.worker(app=self.q)
w.run(loglevel='INFO')
mgr = Manager('sources', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@mgr.task
def greet(name):
time.sleep(5)
return f'Hello, {name}!'
@mgr.task
def query(node, querystr):
return 'node'
if __name__ == '__main__':
mgr.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment