Skip to content

Instantly share code, notes, and snippets.

@buhii
Last active December 20, 2022 08:07
Show Gist options
  • Save buhii/dfd29f8677c3e6785895 to your computer and use it in GitHub Desktop.
Save buhii/dfd29f8677c3e6785895 to your computer and use it in GitHub Desktop.
elasticsearch scripted metric aggregation
def event_usage(self, task_id, since, until, interval="1m"):
interval_ms = 60000 # 1 minute
quantize = lambda t: (t // interval_ms) * interval_ms
q_since, q_until = quantize(since), quantize(until)
# --- scripts ---
quantize_closure = "quantize = { it.intdiv(interval_ms) * interval_ms }\n"
script = {}
script['init'] = """
for (t = since; t <= until; t += interval_ms) {
_time_buckets[t] = 0.0
}
"""
script['map'] = quantize_closure + """
t_init = doc['time-init'].value
t_finish = doc['time-finish'].value
for (t = quantize(t_init); t <= t_finish; t += interval_ms) {
if (_time_buckets.containsKey(quantize(t))) {
s = quantize(t) > t_init ? quantize(t) : t_init
e = quantize(t + interval_ms) > t_finish ? t_finish : quantize(t + interval_ms)
_time_buckets[quantize(t)] += (e - s) / interval_ms
}
}
"""
script['combine'] = """
return _time_buckets
"""
script['reduce'] = """
_final_buckets = new HashMap()
for (_time_buckets in _aggs) {
for (e in _time_buckets) {
time_key = e.key.toString()
if (_final_buckets.containsKey(time_key)) {
_final_buckets[time_key] += e.value
} else {
_final_buckets[time_key] = e.value
}
}
}
return _final_buckets
"""
# --- query ---
aggs = {
"result": {
"scripted_metric": {
"init_script": script['init'],
"map_script": script['map'],
"combine_script": script['combine'],
"reduce_script": script['reduce'],
"params": {
"_time_buckets" : {},
"interval_ms": interval_ms,
"since": q_since,
"until": q_until
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment