Skip to content

Instantly share code, notes, and snippets.

@huogerac
Created June 23, 2019 14:05
Show Gist options
  • Save huogerac/982d98135aa56bbd60b2de59bf2bdf88 to your computer and use it in GitHub Desktop.
Save huogerac/982d98135aa56bbd60b2de59bf2bdf88 to your computer and use it in GitHub Desktop.
"""
Celery Tasks
============
This file defines the expected API.
"""
import jsonschema
from celery import Celery, subtask, group, chord
from . import settings
# Celery App
app = Celery(broker=settings.BROKER_URL, backend=settings.RESULT_BACKEND_URL)
@app.task(bind=True, name='hello.double_value')
def double_value(self, number: int):
if not isinstance(number, int):
raise RuntimeError("Number is not int!")
return number * 2
@app.task(bind=True, name='hello.double_callback')
def double_callback(self, number: int):
universe_life_and_everything = 42
if number != universe_life_and_everything:
raise RuntimeError("The number is not 42!")
return number
def send_notification():
pass
@app.task(bind=True, name='hello.handle_errors')
def handle_errors(self, value):
send_notification(value)
return True
@app.task(bind=True, name='hello.add_value')
def add_value(self, number: int, original_value):
return original_value + number
@app.task(bind=True, name='hello.sum_values')
def sum_values(self, values):
return sum(values)
@app.task(bind=True, name='hello.is_even')
def is_even(self, number):
return number % 2 == 0
@app.task(bind=True, name='hello.map_extract_text')
def get_items_and_double_them(self, seed, total):
"""
FUNCIONA
Contra:
"""
items = get_list_of_items(seed, total)
signature = double_value.map(items)
return signature
@app.task
def dmap(it, callback, final=None):
""" creates a new task for each item from it """
callback = subtask(callback)
run_in_parallel = group(callback.clone([arg, ]) for arg in it)
if len(run_in_parallel.tasks) == 0:
return []
if final:
return chord(run_in_parallel)(final)
return run_in_parallel.delay()
@app.task
def get_list_of_items(seed=42, total=10):
import random
random.seed(seed)
numbers = random.sample(range(1,100), total)
return numbers
@app.task
def get_summary(items):
return {
"sum": sum(items),
"min": min(items),
"max": max(items),
"mean": sum(items)/len(items),
}
import pytest
import mock
from hello_celery import tasks
def test_should_create_task_dynamically_1(celery_app):
"""
Problema:
Dado uma lista de resultados da Task1,
Quero Executar a Task2 para cada item da Task1 (usando fila)
Solucao 1:
Criar uma Task3 que executa a Task1 e faz um map do resultado
com as Task2
"""
signature = tasks.get_items_and_double_them(
seed=42, total=8
).delay()
result = signature.get()
assert result == [164, 30, 8, 190, 72, 64, 58, 36]
def test_should_create_task_dynamically_2(celery_app):
"""
Problema:
Dado uma lista de resultados da Task1,
Quero Executar a Task2 para cada item da Task1 (usando fila)
Solução:
Cria uma Task3 (dynamic map) que recebe a Task2 via parametro
"""
signature = (
tasks.get_list_of_items.s(seed=42, total=8) |
tasks.dmap.s(
callback=tasks.double_value.s(),
)
).delay()
# Nao sei pq tem que fazer 2 gets?
result = signature.get().get()
assert result == [164, 30, 8, 190, 72, 64, 58, 36]
def test_should_create_task_dynamically_with_chord(celery_app):
"""
Problema:
Dado uma lista de resultados da Task1,
Quero Executar a Task2 para cada item da Task1 (usando fila)
"""
dag = (
tasks.get_list_of_items.s(42, 8) |
tasks.dmap.s(
callback=tasks.double_value.s(),
final=tasks.get_summary.s(),
)
).delay()
result = dag.get().get()
assert result == {
'sum': 622,
'min': 8,
'max': 190,
'mean': 77.75
}
import pytest
import mock
from hello_celery import tasks
def test_should_double_number_simple_task(celery_app):
signature = tasks.double_value.s(21).delay()
result = signature.get()
assert result == 42
def test_should_double_number_simple_task_using_queue(celery_app):
signature = tasks.double_value.s(21).apply_async(queue='q1')
result = signature.get()
assert result == 42
def test_should_double_number_another_way(celery_app):
signature = celery_app.signature(
'hello.double_value',
args=(21,),
kwargs={},
queue='q1'
)
assert signature.delay().get() == 42
def test_should_execute_callback(celery_app):
signature = tasks.double_value.s(22)
signature.link(tasks.double_callback.s())
with pytest.raises(Exception) as e:
assert signature.delay().get() == 44
assert str(e.value) == "The number is not 42!"
def test_should_execute_raise_exception(celery_app):
signature = tasks.double_value.s("@")
with pytest.raises(Exception) as e:
assert str(e.value) == "Number is not int!"
def test_should_ignore_exception(celery_app):
with mock.patch('hello_celery.tasks.send_notification') as send_notification_mock:
signature = tasks.double_value.s("@")
signature.link_error(tasks.handle_errors.s())
with pytest.raises(Exception) as e:
signature.delay().get()
assert send_notification_mock.assert_called_once()
def test_should_run_a_chain_workflow(celery_app):
"""
CHAIN (serial tasks)
start --> task1 --> task2
note: tasks2 receives result from task1
"""
signature = (
tasks.double_value.s(21) |
tasks.add_value.s(100)
)
result = signature.delay().get()
assert result == 142
def test_should_run_a_group_workflow(celery_app):
"""
GROUP (paralell tasks)
--- task1
/
start --| --- task2
\\
--- task3
"""
group_tasks = []
for number in [10, 20, 30]:
group_tasks.append(
tasks.add_value.s(number, 42)
)
from celery import group
signature = group(group_tasks)
result = signature.delay().get()
assert result == [10 + 42, 20 + 42, 30 + 42]
def test_should_run_a_chord_workflow(celery_app):
"""
CHORD (put all together)
--- task1 ---
/ \\
start --| --- task2 --- --- task4 ---> result
\\ /
--- task3 ---
"""
group_tasks = []
for number in [10, 20, 30]:
group_tasks.append(
tasks.add_value.s(number, 42)
)
from celery import group
signature = (
group(group_tasks) |
tasks.sum_values.s()
)
result = signature.delay().get()
assert result == 52 + 62 + 72
def test_should_run_a_map_workflow(celery_app):
"""
MAP (put all together)
start --> task1(n1) --> task1(n2) --> task1(n) ...
"""
import random
random.seed(42)
numbers = random.sample(range(1,100), 10)
signature = tasks.is_even.map(numbers)
result = signature.delay().get()
assert result == [
True, False, True, False, True,
True, False, True, True, False
]
def test_should_use_all_celery_components(celery_app):
"""
chain( task group callback
/--- double ---
add --| \\ -- sum
\\-- double --/
"""
from celery import group
signature = (
tasks.add_value.s(100, 50) |
group([
tasks.double_value.s(),
tasks.double_value.s(),
]) |
tasks.sum_values.s()
)
result = signature.delay().get()
assert result == 300 + 300
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment