Skip to content

Instantly share code, notes, and snippets.

@mitsuhiko
Created August 19, 2023 07:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mitsuhiko/aaf097181aa3ce0afdf93ff4f16ec017 to your computer and use it in GitHub Desktop.
Save mitsuhiko/aaf097181aa3ce0afdf93ff4f16ec017 to your computer and use it in GitHub Desktop.
import time
import zlib
from threading import Lock, Thread
class Metric(object):
def add(self, value):
raise NotImplementedError()
def flush(self):
raise NotImplementedError()
class CounterMetric(Metric):
def __init__(self):
self.value = 0.0
def add(self, value):
self.value += value
def flush(self):
return self.value
class DistributionMetric(Metric):
def __init__(self) -> None:
self.value = []
def add(self, value: float):
self.value.append(value)
def flush(self):
return self.value
class SetMetric(Metric):
def __init__(self):
self.value = set()
def add(self, value):
self.value.add(value)
def flush(self):
def _hash(x) -> int:
if isinstance(x, str):
return zlib.crc32(x.encode("utf-8")) & 0xFFFFFFFF
return int(x)
return [_hash(x) for x in self.value]
METRIC_TYPES = {
"c": CounterMetric,
"d": DistributionMetric,
"s": SetMetric,
}
class Aggregator:
ROLLUP = 10.0
def __init__(self) -> None:
self.buckets = {}
self._lock = Lock()
self._running = True
self._flusher = Thread(target=self._flush)
self._flusher.daemon = True
self._flusher.start()
def _flush(self) -> None:
while self._running:
cutoff = time.time() - self.ROLLUP
cleanup = set()
metrics = []
buckets = self.buckets
with self._lock:
for bucket_key, metric in buckets.items():
ts, ty, name, tags = bucket_key
if ts > cutoff:
continue
m = {
"timestamp": ts,
"name": name,
"type": ty,
"value": metric.flush(),
}
if tags:
m["tags"] = dict(tags)
metrics.append(m)
cleanup.add(bucket_key)
for key in cleanup:
buckets.pop(key)
if metrics:
self._emit(metrics)
time.sleep(2.0)
def _emit(self, metrics):
print(metrics)
def add(self, ty, key, value, tags, timestamp) -> None:
if timestamp is None:
timestamp = time.time()
bucket_key = (
int((timestamp // self.ROLLUP) * self.ROLLUP),
ty,
key,
tuple(sorted(tuple((tags or {}).items()))),
)
with self._lock:
metric = self.buckets.get(bucket_key)
if metric is None:
metric = METRIC_TYPES[ty]()
self.buckets[bucket_key] = metric
metric.add(value)
class Client:
def __init__(self) -> None:
self.aggregator = Aggregator()
def incr(self, key, value=1, tags=None, timestamp=None) -> None:
self.aggregator.add("c", key, value, tags, timestamp)
def timing(self, key, value, tags=None, timestamp=None) -> None:
self.aggregator.add("d", key, value, tags, timestamp)
def set(
self,
key,
value,
tags=None,
timestamp=None,
):
self.aggregator.add("s", key, value, tags, timestamp)
def main() -> None:
import random
c = Client()
while True:
for x in range(int(random.random() * 500)):
c.timing(
"foo.bar",
random.random() * 4.7 * random.random() / 2.0,
tags={"foo": "bar"},
)
c.incr(
"foo.barilla",
tags={"foo": "bar"},
)
c.set(
"foo.baz",
random.choice(["foo", "bar", "baz"]),
tags={"foo": "bar"},
)
time.sleep(random.random() * 0.2)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment