Skip to content

Instantly share code, notes, and snippets.

@danthelion
Created June 27, 2022 07:17
Show Gist options
  • Save danthelion/f9ad39e426ce803f1305e69b18e304f7 to your computer and use it in GitHub Desktop.
Save danthelion/f9ad39e426ce803f1305e69b18e304f7 to your computer and use it in GitHub Desktop.
def new_messages(interval="1m"):
results = engine.execute(f"SELECT count(*) FROM changes_by_server_{interval}")
return None if results.fetchone()[0] == 0 else True
async def event_generator(interval: str):
if new_messages(interval=interval):
print(f"New messages in {interval}")
connection = engine.raw_connection()
with connection.cursor() as cur:
cur.execute(f"DECLARE c CURSOR FOR TAIL changes_by_server_{interval}")
cur.execute("FETCH ALL c")
for row in cur:
yield row
await asyncio.sleep(MESSAGE_STREAM_DELAY)
@app.websocket("/wikidata/{interval}")
async def websocket_endpoint(websocket: WebSocket, interval: str):
await manager.connect(websocket)
print(f"Connected to {interval}")
try:
while True:
async for data in event_generator(interval=interval):
print(f"Sending {data}")
payload = {
"server_name": data[2],
"count": data[3],
}
await websocket.send_json(payload)
except WebSocketDisconnect:
manager.disconnect(websocket)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment