Skip to content

Instantly share code, notes, and snippets.

@OniOni
Created August 2, 2017 16:09
Show Gist options
  • Save OniOni/e639c6b90b66bec4794b7a8c65b1d7b1 to your computer and use it in GitHub Desktop.
Save OniOni/e639c6b90b66bec4794b7a8c65b1d7b1 to your computer and use it in GitHub Desktop.
from multiprocessing import Process
import pika
exchange = 'test'
def mk_consumer(exchange, match, callback):
def inner():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange=exchange,
type='headers'
)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
match['x-match'] = 'any'
channel.queue_bind(
exchange=exchange, queue=queue_name, arguments=match)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
return inner
def consumer_one_callback(*a, **k):
print('Consumer 1')
print(a, k)
def consumer_two_callback(*a, **k):
print('Consumer 2')
print(a, k)
def run():
try:
c_one = Process(target=mk_consumer(exchange, {'type': 'email', 'channel': 'test'} , consumer_one_callback))
c_one.start()
c_two = Process(target=mk_consumer(exchange, {'type': 'sms', 'channel': 'test'} , consumer_two_callback))
c_two.start()
c_one.join()
c_two.join()
except KeyboardInterrupt:
print('Cleanup...')
c_one.terminate()
c_two.terminate()
c_one.join()
c_two.join()
print('Done')
def produce(headers, message):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange=exchange,
type='headers'
)
channel.basic_publish(exchange, '', message, pika.BasicProperties(headers=headers))
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment