Skip to content

Instantly share code, notes, and snippets.

@Kludex
Created January 3, 2021 14:18
Show Gist options
  • Save Kludex/2aefbd50a240f0b11c439b51a48c8d7f to your computer and use it in GitHub Desktop.
Save Kludex/2aefbd50a240f0b11c439b51a48c8d7f to your computer and use it in GitHub Desktop.
Async init
import asyncio
from typing import Type
from aio_pika import RobustConnection, connect_robust
from aio_pika.connection import ConnectionType
from aio_pika.types import TimeoutType
class RabbitMQClient:
def __init__(
self,
url: str = None,
*,
host: str = "localhost",
port: int = 5672,
login: str = "guest",
password: str = "guest",
virtualhost: str = "/",
ssl: bool = False,
loop: asyncio.AbstractEventLoop = None,
ssl_options: dict = None,
timeout: TimeoutType = None,
connection_class: Type[ConnectionType] = RobustConnection,
client_properties: dict = None,
**kwargs
):
self.url = url
self.host = host
self.port = port
self.login = login
self.password = password
self.virtualhost = virtualhost
self.ssl = ssl
self.loop = loop
self.ssl_options = ssl_options
self.timeout = timeout
self.connection_class = connection_class
self.client_properties = client_properties
self.kwargs = kwargs
async def async_init(self):
self.connection = await connect_robust(
self.url,
host=self.host,
port=self.port,
login=self.login,
password=self.password,
virtualhost=self.virtualhost,
ssl=self.ssl,
loop=self.loop,
ssl_options=self.ssl_options,
timeout=self.timeout,
connection_class=self.connection_class,
client_properties=self.client_properties,
**self.kwargs
)
return self.connection
def __await__(self):
return self.async_init().__await__()
async def main():
rabbit = await RabbitMQClient(host="localhost", port=5672)
print(rabbit)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment