It's much better simply implementing the callbacks on_success
/ on_failure
.
import dramatiq
@dramatiq.actor
def identity(x):
return x
@dramatiq.actor
def print_result(message_data, result):
print(f"The result of message {message_data['message_id']} was {result}.")
@dramatiq.actor
def print_error(message_data, exception_data):
print(f"Message {message_data['message_id']} failed:")
print(f" * type: {exception_data['type']}")
print(f" * message: {exception_data['message']!r}")
if __name__ == "__main__":
identity.send_with_options(
args=(42,),
on_failure=print_error,
on_success=print_result,
)
Literally taken from docs: https://dramatiq.io/cookbook.html