Skip to content

Instantly share code, notes, and snippets.

@tonholis
Last active November 10, 2023 15:44
Show Gist options
  • Save tonholis/89e707f70d190bd1c28e0813dbcb3a6c to your computer and use it in GitHub Desktop.
Save tonholis/89e707f70d190bd1c28e0813dbcb3a6c to your computer and use it in GitHub Desktop.
Python consumer that works with MassTransit
import sys
import pika
import json
import re
import logging
import logging.handlers
PUBLISHER_EXCHANGE_NAME = 'DoSomeJob'
CONSUMER_QUEUE_NAME = 'python_consumer_queue'
# Logging config
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
])
logger = logging.getLogger()
def callback(ch, method, props, body):
logging.info("Message received - CorrelationId={1}".format(props.correlation_id))
logging.debug("Message contents:\n'{0}'".format(body))
try:
request = json.loads(body.decode('UTF-8'))
# Do some stuff
result = 'some string result... It could be an object'
exchange = getExchangeName(request['responseAddress'])
logging.debug("Exchange extracted from responseAddress request: '{0}'".format(exchange))
responseBody = createMassTransitResponse(result, request)
logging.debug("Response body for MassTransit:\n'{0}'".format(responseBody))
# Respond to the initiator exchange extracted from request['responseAddress']
ch.basic_publish(exchange,
routing_key = exchange,
properties = pika.BasicProperties(correlation_id = props.correlation_id),
body = responseBody)
except Exception as e:
logging.error("error {0}".format(e))
logging.error(traceback.format_exc())
ch.basic_ack(delivery_tag = method.delivery_tag)
logging.info("Message handling complete")
#It just extract the exchange name from a URI
def getExchangeName(address):
parts = re.search('/(\w+)\?', address)
return parts.group(1)
# It creates a message according to the Masstransit requirements. More info http://masstransit-project.com/MassTransit/advanced/interoperability.html
def createMassTransitResponse(result, requestBody):
response = {
'requestId': requestBody['requestId'],
'correlationId': requestBody['correlationId'],
'conversationId': requestBody['conversationId'],
'initiatorId': requestBody['correlationId'],
'sourceAddress': requestBody['destinationAddress'],
'destinationAddress': requestBody['sourceAddress'],
'messageType': [
'urn:message:INTERFACE_NAMESPACE_HERE:YOUR_CSHARP_INTERFACE_NAME'
],
# The `message` value/object must implement the interface mentioned on messageType
'message': result
}
return json.dumps(response)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#Make sure that we have an Exchange created the same way as masstransit creates one
channel.exchange_declare(exchange=PUBLISHER_EXCHANGE_NAME, exchange_type='fanout', durable=True)
channel.queue_declare(queue=CONSUMER_QUEUE_NAME,
exclusive=False,
auto_delete=True,
durable=False)
channel.queue_bind(queue=CONSUMER_QUEUE_NAME, exchange=PUBLISHER_EXCHANGE_NAME)
channel.basic_consume(callback, queue=CONSUMER_QUEUE_NAME)
channel.start_consuming()
@TolstochenkoDaniil
Copy link

Your post helped a lot! Just changed response schema to fit masstransit producer interface ♻️
Much thanks! 👍

@tonholis
Copy link
Author

I'm glad it helped you. That was my first attempt make a python script getting messages from a MassTransit based app. I'm not a python expert BTW...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment