Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nurettin/14e077f88b8c1edb22bc810bdbbac49e to your computer and use it in GitHub Desktop.
Save nurettin/14e077f88b8c1edb22bc810bdbbac49e to your computer and use it in GitHub Desktop.
from asyncio.tasks import Task
from typing import Dict
import asyncio
from uuid import uuid4
class PiQueConnection:
def __init__(self, pique: 'PiQue', uid: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.pique = pique
self.uid = uid
self.reader = reader
self.writer = writer
self.sub_tasks: Dict[str, Task] = {}
print(f"Client({self.uid}) connected.")
async def recv(self):
while True:
bytes = await self.reader.readline()
if bytes == b"":
self.pique.clients.pop(self.uid)
for name, task in list(self.sub_tasks.items()):
task.cancel()
self.sub_tasks.pop(name, None)
print(f"Client({self.uid}) disconnected.")
break
data = bytes.decode()
print(f"Client({self.uid}) -> {data}")
if data.startswith("CREATE_QUEUE "):
_, name = data.split()
self.pique.create_queue(name=name)
elif data.startswith("QUEUE "):
_, name, message = data.split()
self.pique.queue(name=name, message=message)
elif data.startswith("SUB "):
_, name = data.split()
task = self.sub_tasks.get(name)
if task is None:
queue = self.pique.queues.get(name)
if queue is not None:
self.sub_tasks[name] = self.pique.loop.create_task(
self.sub(name, queue))
elif data.startswith("UNSUB "):
_, name = data.split()
task = self.sub_tasks.get(name)
if task is not None:
task.cancel()
self.sub_tasks.pop(name, None)
async def sub(self, name: str, queue: asyncio.Queue):
while True:
data = await queue.get()
await self.send(message=f"QUEUE {name} {data}")
async def send(self, message: str):
print(f"Client({self.uid}) <- {message}")
self.writer.write(message.encode())
await self.writer.drain()
class PiQue:
def __init__(self, host: str, port: str, loop=None):
self.host = host
self.port = port
self.loop = asyncio.get_event_loop() if loop is None else loop
self.queues: Dict[str, asyncio.Queue] = {}
self.clients: Dict[str, PiQueConnection] = {}
def run(self):
print(f"Starting PiQue v3.141592 {self.host}:{self.port}")
self.loop.create_task(asyncio.start_server(client_connected_cb=self.client_connected_cb,
host=self.host, port=self.port, loop=self.loop))
self.loop.create_task(self.ping())
self.loop.run_forever()
def client_connected_cb(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
new_client = PiQueConnection(pique=self, uid=str(
uuid4()), reader=reader, writer=writer)
self.clients[new_client.uid] = new_client
self.loop.create_task(new_client.recv())
async def ping(self):
while True:
for _, client in list(self.clients.items()):
await client.send("ping")
await asyncio.sleep(30)
def create_queue(self, name: str):
queue = self.queues.get(name)
if queue is None:
self.queues[name] = asyncio.Queue()
def queue(self, name: str, message: str):
queue = self.queues.get(name)
if queue is not None:
queue.put_nowait(message)
if __name__ == "__main__":
pique = PiQue(host="127.0.0.1", port="31415")
try:
pique.run()
except KeyboardInterrupt:
print("Bye.")
@nurettin
Copy link
Author

nurettin commented Jan 8, 2021

tab 1: telnet localhost 31415

CREATE_QUEUE omg

tab 2: telnet localhost 31415

SUB omg

tab 1:

QUEUE omg lol

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment