Skip to content

Instantly share code, notes, and snippets.

@zed
Last active March 18, 2024 18:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zed/07e8951236341480c40c4e69757c98e3 to your computer and use it in GitHub Desktop.
Save zed/07e8951236341480c40c4e69757c98e3 to your computer and use it in GitHub Desktop.
Receive udp messages and write summary info to file

To try it, run:

./summarize-udp-async.py 29559 stats.json

then send sample messages:

./send-sample-messages.py

Inspect stats.json file.

#!/usr/bin/env python3
"""Send sample udp messages to sanity-check the `summarize-udp` script."""
import json
import socket
import time
host, port = "localhost", 29559
def send_udp_message(message: bytes):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(message, (host, port))
def main():
for message in [
dict(a=1, b=2, c=3),
dict(a=2, b=4, c=9),
dict(a=3, b=8, c=27),
]:
send_udp_message(json.dumps(message).encode())
time.sleep(1)
for message in [
dict(a=4, b=16, c=81),
dict(a=5, b=32, c=243),
dict(a=6, b=64, c=729),
]:
send_udp_message(json.dumps(message).encode())
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""Receive udp messages and write summary info to file.
Usage: summarize-udp <udp-port> <output-file>
Input message format: json object with 3 integer fields (KEYS) e.g.:
{"a":1, "b":2, "c":3}
Output file contains stats (sum,max,min correspondingly for
STATS_INTERVAL_SEC seconds).
"""
import asyncio
import json
import logging
import sys
import time
from collections import defaultdict
from operator import itemgetter
from pathlib import Path
from typing import DefaultDict, List, Tuple
#: intervals in seconds to summarize stats for
STATS_INTERVAL_SEC = 1, 6
#: recognized keys in the input json messages
KEYS = "a", "b", "c"
#: corresponding statistics (aggregates)
STATS = sum, max, min
logger = logging.getLogger("summarize-udp")
class UDPProtocol(asyncio.DatagramProtocol):
"""Call given coroutine, passing it the decoded json message."""
def __init__(self, coro):
self.handler = coro
self.background_tasks = set()
def datagram_received(self, data, addr):
try:
message = json.loads(data.decode())
except json.JSONDecodeError as e:
logger.error(
"Failed to decode %r from %r, reason: %s", data, addr, e
)
else:
task = asyncio.create_task(self.handler(message))
# save strong reference to the task, to avoid it disappearing mid-execution
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
class UDPSummarizer:
"""Receive udp messages and write summary info to file."""
def __init__(self, port: int, output_filename: str):
self.port = port
self.path = Path(output_filename)
self.stats: DefaultDict[int, List[Tuple[int, int, int]]] = defaultdict(
list
)
async def run(self):
"""Entry point for UDPSummarizer.
Listen for udp messages on *self.port* on this host and write
stats to self.path.
"""
loop = asyncio.get_running_loop()
transport, _protocol = await loop.create_datagram_endpoint(
lambda: UDPProtocol(self.process_message),
local_addr=("0.0.0.0", self.port),
)
try:
await asyncio.gather(
*[
self.write_stats(interval)
for interval in STATS_INTERVAL_SEC
]
)
finally:
transport.close()
async def process_message(self, message: dict):
"""Duplicate KEYS values for each interval in STATS_INTERVAL_SEC."""
values = itemgetter(*KEYS)(message)
for interval in STATS_INTERVAL_SEC:
self.stats[interval].append(values)
async def write_stats(self, interval: int):
"""Write stats at *interval* seconds boundary."""
loop = asyncio.get_running_loop()
timer = loop.time
while True:
# write stats and "interval" boundary according to timer(), see:
# https://ru.stackoverflow.com/a/577378/23044
await asyncio.sleep(interval - timer() % interval)
stats = self.stats[interval]
if not stats:
# no stats accumulated yet
stats = [(0,)] * len(KEYS) # write zero statistics anyway
data = {
f"{key}_{aggregate.__name__}": aggregate(values)
for key, values, aggregate in zip(KEYS, stats, STATS)
}
stats.clear()
# TODO consider async. I/O if it is a bottleneck
with self.path.open("a", encoding="utf-8") as file:
json.dump(
dict(
data,
timestamp=int(time.time()),
count_type=f"{interval}s",
),
file,
)
file.write("\n")
async def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
if len(sys.argv) != 3:
sys.exit(__doc__)
port = int(sys.argv[1])
filename = sys.argv[2]
summarizer = UDPSummarizer(port, filename)
try:
await summarizer.run()
except KeyboardInterrupt:
logger.warning("Got Ctrl-C")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment