This code does roughly the same thing as this example, but in a more optimized and flexible way, because:
- it does not open channels for each node on both client and server. In the example there were 3 queues and each vorker would open all 3 channels that it never used. This is achieved by separating the brokers into Listener and Sender (Routed).
- it supports dynamic sending to the specific node, the code from the example only supported predefined queues.
- Added more convenient aliases to remove the duplication of
with_broker(broker).with_labels(node=node_name)
on each task call.
It also breaks compatibility with taskiq_aio_pika a bit, since it uses a different Exchange type and uses the node name as routing_key.
But despite the pros it doesn't support some taskiq features and is more complex to use. For example worker will not be able to call context.requeue
because listener broker has no write channel.
The startup is almost the