Skip to content

Instantly share code, notes, and snippets.

@antnieszka
Last active December 12, 2019 19:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save antnieszka/c9f841acd4ec7a03ab086f415ff34397 to your computer and use it in GitHub Desktop.
Save antnieszka/c9f841acd4ec7a03ab086f415ff34397 to your computer and use it in GitHub Desktop.
Async websocket server with rabbit consumer
#!/usr/bin/env python3
import itertools
import os
import random
import aio_pika
import asyncio
import logging
import websockets
logger = logging.getLogger(__name__)
class Worker:
def __init__(self):
self._websocket = None
async def process_message(self, message: aio_pika.IncomingMessage):
async with message.process():
logger.info("Got message: %s" % message.body)
if self._websocket and self._websocket.open:
await self._websocket.send(str(message.body))
async def socket_handler(self, websocket, path):
self._websocket = websocket
handler_id = random.randint(1, 9999)
await websocket.send("Hello from main, this is %d" % handler_id)
connection = await aio_pika.connect_robust(
"amqp://{user}:{passwd}@{host}/".format(
user=os.getenv("RABBIT_USERNAME"),
passwd=os.getenv("RABBIT_PASSWORD"),
host=os.getenv("RABBIT_HOST"),
)
)
# Creating channel
channel = await connection.channel()
# Maximum message count which will be
# processing at the same time.
await channel.set_qos(prefetch_count=100)
# Declaring queue
queue = await channel.declare_queue("ao.model", durable=True)
# await queue.consume(process_message)
for i in itertools.count():
try:
await websocket.send("Still alive! [%s]" % i)
await queue.consume(self.process_message)
except websockets.ConnectionClosed:
logger.info(
"Handler {} was closed after {} iterations".format(handler_id, i)
)
self._websocket = None
return
else:
await asyncio.sleep(1)
def start(self):
start_server = websockets.serve(self.socket_handler, "localhost", 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
def start_resultsocket():
logger.info("Started result socket thread.")
worker = Worker()
worker.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment