Skip to content

Instantly share code, notes, and snippets.

@gdyrrahitis
Last active October 16, 2020 22:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gdyrrahitis/c763fae2978370f05f061c8089eb1db6 to your computer and use it in GitHub Desktop.
Save gdyrrahitis/c763fae2978370f05f061c8089eb1db6 to your computer and use it in GitHub Desktop.
import rabbitpy
def publish(channel):
body = "Hello world!"
message = rabbitpy.Message(channel, body, {'content_type': 'text/plain'})
delivered = message.publish('rabbitpy-tests', 'ha-routing-key', mandatory=True)
if delivered:
print('Message delivered')
else:
print('Delivery failed')
if __name__ == '__main__':
# I have created a new admin user "test" with password "test"
# The below is the IP of the proxy
url = "amqp://test:test@192.168.0.221:5672/"
try:
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
# enabled publisher confirms
channel.enable_publisher_confirms()
publish(channel)
except rabbitpy.exceptions.MessageReturnedException as ex:
print "Failed to publish %s" % ex
except KeyboardInterrupt:
print('exiting...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment