Last active
December 12, 2019 19:02
-
-
Save antnieszka/c9f841acd4ec7a03ab086f415ff34397 to your computer and use it in GitHub Desktop.
Async websocket server with rabbit consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import itertools | |
import os | |
import random | |
import aio_pika | |
import asyncio | |
import logging | |
import websockets | |
logger = logging.getLogger(__name__) | |
class Worker: | |
def __init__(self): | |
self._websocket = None | |
async def process_message(self, message: aio_pika.IncomingMessage): | |
async with message.process(): | |
logger.info("Got message: %s" % message.body) | |
if self._websocket and self._websocket.open: | |
await self._websocket.send(str(message.body)) | |
async def socket_handler(self, websocket, path): | |
self._websocket = websocket | |
handler_id = random.randint(1, 9999) | |
await websocket.send("Hello from main, this is %d" % handler_id) | |
connection = await aio_pika.connect_robust( | |
"amqp://{user}:{passwd}@{host}/".format( | |
user=os.getenv("RABBIT_USERNAME"), | |
passwd=os.getenv("RABBIT_PASSWORD"), | |
host=os.getenv("RABBIT_HOST"), | |
) | |
) | |
# Creating channel | |
channel = await connection.channel() | |
# Maximum message count which will be | |
# processing at the same time. | |
await channel.set_qos(prefetch_count=100) | |
# Declaring queue | |
queue = await channel.declare_queue("ao.model", durable=True) | |
# await queue.consume(process_message) | |
for i in itertools.count(): | |
try: | |
await websocket.send("Still alive! [%s]" % i) | |
await queue.consume(self.process_message) | |
except websockets.ConnectionClosed: | |
logger.info( | |
"Handler {} was closed after {} iterations".format(handler_id, i) | |
) | |
self._websocket = None | |
return | |
else: | |
await asyncio.sleep(1) | |
def start(self): | |
start_server = websockets.serve(self.socket_handler, "localhost", 5678) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
asyncio.get_event_loop().run_forever() | |
def start_resultsocket(): | |
logger.info("Started result socket thread.") | |
worker = Worker() | |
worker.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment