Skip to content

Instantly share code, notes, and snippets.

@Sovetnikov
Created February 20, 2017 09:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sovetnikov/a7ad982fc77e8dfbc528bfc20fcf3b1e to your computer and use it in GitHub Desktop.
Save Sovetnikov/a7ad982fc77e8dfbc528bfc20fcf3b1e to your computer and use it in GitHub Desktop.
import os
import subprocess
import unittest
from datetime import timedelta
from random import SystemRandom
from celery import Celery
from kombu import Exchange
from kombu import Queue
from shared.celery.channel_task import MessageTask
ABSOLUTE_PATH = lambda x: os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), x))
app = Celery('channel_test')
app.autodiscover_tasks(('shared',))
app.conf.update(
CELERY_TASK_SERIALIZER='pickle',
CELERY_ACCEPT_CONTENT=['json', 'pickle'],
CELERY_RESULT_SERIALIZER='pickle',
CELERY_TIMEZONE='Europe/Moscow',
CELERY_RESULT_BACKEND='rpc://',
CELERY_TASK_RESULT_EXPIRES=timedelta(weeks=12),
CELERY_DEFAULT_QUEUE='celery-channel-test',
BROKER_URL='amqp://user:password@localhost:5672/vhost',
ADMINS=(
('', 'e@mail.com'),
),
CELERY_SEND_TASK_ERROR_EMAILS=False,
CELERYD_LOG_COLOR=False,
CELERY_ALWAYS_EAGER=False,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERYD_MAX_TASKS_PER_CHILD=1,
CELERYD_PREFETCH_MULTIPLIER=1,
)
@app.task(bind=True, base=MessageTask)
def channel_task_test(self, value):
self.send({'a': value})
return 1
if __name__ == '__main__':
from celery.bin.celery import main as celery_main
celery_main()
class TestChannelTask(unittest.TestCase):
def setUp(self):
pcwd = os.getcwd()
os.chdir(ABSOLUTE_PATH('../..'))
cmd = 'python {1} --hostname=test-channel@host -A {0} worker -l info --concurrency=2 -Ofair'.format(__name__, __file__)
self.celery_process = subprocess.Popen(cmd, shell=False)
os.chdir(pcwd)
def tearDown(self):
self.celery_process.kill()
def test_channel_task(self):
for _ in range(0, 2):
tasks = {}
for n in range(0, 10):
value = SystemRandom().randint(0, 999999999)
tasks[value] = channel_task_test.delay(value)
for value, result in tasks.items():
message = result.recv()
self.assertEqual(len(message), 1)
self.assertEqual(message[0]['a'], value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment