Skip to content

Instantly share code, notes, and snippets.

@samv
Created March 14, 2012 00:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samv/2032976 to your computer and use it in GitHub Desktop.
Save samv/2032976 to your computer and use it in GitHub Desktop.
Program which uses a slave celery
$ rm celery*; nosetests -s -v tests/test_simplified.py
test_01_task (test_simplified.TestAsync) ... foo_our_task is: <@task: test.task>
assigned!
thread is: <Thread(Thread-1, initial)>
Worker ('worker0') running!
thread is: <Thread(Thread-1, started 4456366080)>
our_task is this time: <@task: test.task>
celery@worker0 has started.
result is :<AsyncResult: 7e42d3b4-0be5-4f48-8c7d-b305a78591a8>
We're in foo_our_task! Returning 'foo bar'
woot! got: 'foo bar'
We're in foo_our_task! Returning 'bzr-blah'
woot! got: 'bzr-blah'
Broadcasting shutdown to ['worker0']
Done!
Joining Thread
from __future__ import absolute_import
from unittest2 import TestCase
import threading
global worker_num
worker_num = 0
class OurWorker(object):
def __init__(self, celery, broadcast):
self.celery = celery
global worker_num
self.hostname = "worker{0}".format(worker_num)
worker_num += 1
self.broadcast = broadcast
def run(self):
print "Worker (%s) running!" % repr(self.hostname)
self.celery.Worker(hostname=self.hostname, pool_cls="threads").run_worker()
print "Worker finished!"
def close(self):
print "Broadcasting shutdown to [{0}]".format(repr(self.hostname))
self.broadcast("shutdown", destination=[self.hostname])
class TestAsync(TestCase):
"""This performs all relatively fast tests that can be done using Celery.
"""
def setUp(self):
celery_conf = {
'CELERY_RESULT_DBURI': 'sqlite:///celeryresults.sqlite',
'BROKER_TRANSPORT': 'sqlalchemy',
'CELERY_RESULT_BACKEND': 'database',
'BROKER_HOST': 'sqlite:///celeryqueue.sqlite',
}
from celery import Celery
celery = Celery(set_as_current=True)
celery.conf.update(**celery_conf)
from celery.task.control import broadcast
global our_task
from celery.task import task
@task(name="test.task", ignore_result=False)
def foo_our_task(firstarg, secondarg, baz=" "):
rv = "{0}{baz}{1}".format(firstarg, secondarg, baz=baz)
print "We're in foo_our_task! Returning '%s'" % rv
return rv
print "foo_our_task is: "+repr(foo_our_task)
our_task = foo_our_task
print "assigned!"
worker = OurWorker(celery, broadcast)
def foo():
worker.run()
self.thread = threading.Thread(target=foo)
#celery.Worker().run_worker()
self.worker = worker
print "thread is: "+repr(self.thread)
self.thread.start()
print "thread is: "+repr(self.thread)
#self.celery = celery
def tearDown(self):
self.worker.close()
print "Done!"
self.worker = None
print "Joining Thread"
self.thread.join()
self.thread = None
def test_01_task(self):
global our_task
print "our_task is this time: "+repr(our_task)
result = our_task.delay("foo", "bar")
print "result is :" + repr(result)
got = result.get()
print "woot! got: " + repr(got)
self.assertEqual(got, "foo bar", "test direct our_task")
result = our_task.delay("bzr", "blah", baz="-")
got = result.get()
print "woot! got: " + repr(got)
self.assertEqual(got, "bzr-blah", "test direct our_task II")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment