Skip to content

Instantly share code, notes, and snippets.

@aiexz
Created February 12, 2023 11:33
Show Gist options
  • Save aiexz/7f93fd6b91fe10d280dee6d6395bdbd4 to your computer and use it in GitHub Desktop.
Save aiexz/7f93fd6b91fe10d280dee6d6395bdbd4 to your computer and use it in GitHub Desktop.
import asyncio
import ssl
import aio_pika
from aio_pika.abc import SSLOptions
async def main() -> None:
connection = await aio_pika.connect_robust(
host="127.0.0.1",
port=5671,
auth='external',
ssl=True,
ssl_options=SSLOptions(
cafile="cacert.pem",
certfile="cert.pem",
keyfile="key.pem",
no_verify_ssl=ssl.CERT_REQUIRED,
),
)
async with connection:
routing_key = "aio-pika-ssl"
channel = await connection.channel()
await channel.declare_queue(routing_key, auto_delete=True)
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
print(" [x] Sent 'Hello {}' to {}".format(routing_key, routing_key))
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment