To try it, run:
./summarize-udp-async.py 29559 stats.json
then send sample messages:
./send-sample-messages.py
Inspect stats.json
file.
/stats.json |
#!/usr/bin/env python3 | |
"""Send sample udp messages to sanity-check the `summarize-udp` script.""" | |
import json | |
import socket | |
import time | |
host, port = "localhost", 29559 | |
def send_udp_message(message: bytes): | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: | |
s.sendto(message, (host, port)) | |
def main(): | |
for message in [ | |
dict(a=1, b=2, c=3), | |
dict(a=2, b=4, c=9), | |
dict(a=3, b=8, c=27), | |
]: | |
send_udp_message(json.dumps(message).encode()) | |
time.sleep(1) | |
for message in [ | |
dict(a=4, b=16, c=81), | |
dict(a=5, b=32, c=243), | |
dict(a=6, b=64, c=729), | |
]: | |
send_udp_message(json.dumps(message).encode()) | |
if __name__ == '__main__': | |
main() |
#!/usr/bin/env python3 | |
"""Receive udp messages and write summary info to file. | |
Usage: summarize-udp <udp-port> <output-file> | |
Input message format: json object with 3 integer fields (KEYS) e.g.: | |
{"a":1, "b":2, "c":3} | |
Output file contains stats (sum,max,min correspondingly for | |
STATS_INTERVAL_SEC seconds). | |
""" | |
import asyncio | |
import json | |
import logging | |
import sys | |
import time | |
from collections import defaultdict | |
from operator import itemgetter | |
from pathlib import Path | |
from typing import DefaultDict, List, Tuple | |
#: intervals in seconds to summarize stats for | |
STATS_INTERVAL_SEC = 1, 6 | |
#: recognized keys in the input json messages | |
KEYS = "a", "b", "c" | |
#: corresponding statistics (aggregates) | |
STATS = sum, max, min | |
logger = logging.getLogger("summarize-udp") | |
class UDPProtocol(asyncio.DatagramProtocol): | |
"""Call given coroutine, passing it the decoded json message.""" | |
def __init__(self, coro): | |
self.handler = coro | |
self.background_tasks = set() | |
def datagram_received(self, data, addr): | |
try: | |
message = json.loads(data.decode()) | |
except json.JSONDecodeError as e: | |
logger.error( | |
"Failed to decode %r from %r, reason: %s", data, addr, e | |
) | |
else: | |
task = asyncio.create_task(self.handler(message)) | |
# save strong reference to the task, to avoid it disappearing mid-execution | |
self.background_tasks.add(task) | |
task.add_done_callback(self.background_tasks.discard) | |
class UDPSummarizer: | |
"""Receive udp messages and write summary info to file.""" | |
def __init__(self, port: int, output_filename: str): | |
self.port = port | |
self.path = Path(output_filename) | |
self.stats: DefaultDict[int, List[Tuple[int, int, int]]] = defaultdict( | |
list | |
) | |
async def run(self): | |
"""Entry point for UDPSummarizer. | |
Listen for udp messages on *self.port* on this host and write | |
stats to self.path. | |
""" | |
loop = asyncio.get_running_loop() | |
transport, _protocol = await loop.create_datagram_endpoint( | |
lambda: UDPProtocol(self.process_message), | |
local_addr=("0.0.0.0", self.port), | |
) | |
try: | |
await asyncio.gather( | |
*[ | |
self.write_stats(interval) | |
for interval in STATS_INTERVAL_SEC | |
] | |
) | |
finally: | |
transport.close() | |
async def process_message(self, message: dict): | |
"""Duplicate KEYS values for each interval in STATS_INTERVAL_SEC.""" | |
values = itemgetter(*KEYS)(message) | |
for interval in STATS_INTERVAL_SEC: | |
self.stats[interval].append(values) | |
async def write_stats(self, interval: int): | |
"""Write stats at *interval* seconds boundary.""" | |
loop = asyncio.get_running_loop() | |
timer = loop.time | |
while True: | |
# write stats and "interval" boundary according to timer(), see: | |
# https://ru.stackoverflow.com/a/577378/23044 | |
await asyncio.sleep(interval - timer() % interval) | |
stats = self.stats[interval] | |
if not stats: | |
# no stats accumulated yet | |
stats = [(0,)] * len(KEYS) # write zero statistics anyway | |
data = { | |
f"{key}_{aggregate.__name__}": aggregate(values) | |
for key, values, aggregate in zip(KEYS, stats, STATS) | |
} | |
stats.clear() | |
# TODO consider async. I/O if it is a bottleneck | |
with self.path.open("a", encoding="utf-8") as file: | |
json.dump( | |
dict( | |
data, | |
timestamp=int(time.time()), | |
count_type=f"{interval}s", | |
), | |
file, | |
) | |
file.write("\n") | |
async def main(): | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname)s %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
if len(sys.argv) != 3: | |
sys.exit(__doc__) | |
port = int(sys.argv[1]) | |
filename = sys.argv[2] | |
summarizer = UDPSummarizer(port, filename) | |
try: | |
await summarizer.run() | |
except KeyboardInterrupt: | |
logger.warning("Got Ctrl-C") | |
if __name__ == "__main__": | |
asyncio.run(main()) |