Created
March 14, 2012 00:28
-
-
Save samv/2032976 to your computer and use it in GitHub Desktop.
Program which uses a slave celery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ rm celery*; nosetests -s -v tests/test_simplified.py | |
test_01_task (test_simplified.TestAsync) ... foo_our_task is: <@task: test.task> | |
assigned! | |
thread is: <Thread(Thread-1, initial)> | |
Worker ('worker0') running! | |
thread is: <Thread(Thread-1, started 4456366080)> | |
our_task is this time: <@task: test.task> | |
celery@worker0 has started. | |
result is :<AsyncResult: 7e42d3b4-0be5-4f48-8c7d-b305a78591a8> | |
We're in foo_our_task! Returning 'foo bar' | |
woot! got: 'foo bar' | |
We're in foo_our_task! Returning 'bzr-blah' | |
woot! got: 'bzr-blah' | |
Broadcasting shutdown to ['worker0'] | |
Done! | |
Joining Thread |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import | |
from unittest2 import TestCase | |
import threading | |
global worker_num | |
worker_num = 0 | |
class OurWorker(object): | |
def __init__(self, celery, broadcast): | |
self.celery = celery | |
global worker_num | |
self.hostname = "worker{0}".format(worker_num) | |
worker_num += 1 | |
self.broadcast = broadcast | |
def run(self): | |
print "Worker (%s) running!" % repr(self.hostname) | |
self.celery.Worker(hostname=self.hostname, pool_cls="threads").run_worker() | |
print "Worker finished!" | |
def close(self): | |
print "Broadcasting shutdown to [{0}]".format(repr(self.hostname)) | |
self.broadcast("shutdown", destination=[self.hostname]) | |
class TestAsync(TestCase): | |
"""This performs all relatively fast tests that can be done using Celery. | |
""" | |
def setUp(self): | |
celery_conf = { | |
'CELERY_RESULT_DBURI': 'sqlite:///celeryresults.sqlite', | |
'BROKER_TRANSPORT': 'sqlalchemy', | |
'CELERY_RESULT_BACKEND': 'database', | |
'BROKER_HOST': 'sqlite:///celeryqueue.sqlite', | |
} | |
from celery import Celery | |
celery = Celery(set_as_current=True) | |
celery.conf.update(**celery_conf) | |
from celery.task.control import broadcast | |
global our_task | |
from celery.task import task | |
@task(name="test.task", ignore_result=False) | |
def foo_our_task(firstarg, secondarg, baz=" "): | |
rv = "{0}{baz}{1}".format(firstarg, secondarg, baz=baz) | |
print "We're in foo_our_task! Returning '%s'" % rv | |
return rv | |
print "foo_our_task is: "+repr(foo_our_task) | |
our_task = foo_our_task | |
print "assigned!" | |
worker = OurWorker(celery, broadcast) | |
def foo(): | |
worker.run() | |
self.thread = threading.Thread(target=foo) | |
#celery.Worker().run_worker() | |
self.worker = worker | |
print "thread is: "+repr(self.thread) | |
self.thread.start() | |
print "thread is: "+repr(self.thread) | |
#self.celery = celery | |
def tearDown(self): | |
self.worker.close() | |
print "Done!" | |
self.worker = None | |
print "Joining Thread" | |
self.thread.join() | |
self.thread = None | |
def test_01_task(self): | |
global our_task | |
print "our_task is this time: "+repr(our_task) | |
result = our_task.delay("foo", "bar") | |
print "result is :" + repr(result) | |
got = result.get() | |
print "woot! got: " + repr(got) | |
self.assertEqual(got, "foo bar", "test direct our_task") | |
result = our_task.delay("bzr", "blah", baz="-") | |
got = result.get() | |
print "woot! got: " + repr(got) | |
self.assertEqual(got, "bzr-blah", "test direct our_task II") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment