Skip to content

Instantly share code, notes, and snippets.

@jroyalty
Created January 2, 2014 21:48
Show Gist options
  • Save jroyalty/8227589 to your computer and use it in GitHub Desktop.
Save jroyalty/8227589 to your computer and use it in GitHub Desktop.
Snippet from that takes time-series data without a fixed interval and returns it bucketed by a configurable size (interval_seconds). This is abstracted from: https://github.com/pcn/pyKairosDB/blob/master/pyKairosDB/graphite.py ... This would be easily adapted for use by an OpenTSDB reader for Graphite.
def read_absolute(conn, metric_name, start_time, end_time):
"""
:type conn: pyKairosDB.KairosDBConnection
:param conn: The connection to KairosDB
:type metric_name: string
:param metric_name: The name of the metric to query (graphite does one at a time, though KairosDB can do more)
:type start_time: float
:param start_time: The float representing the number of seconds since the epoch that this query starts at.
:type end_time: float
:param end_time: The float representing the number of seconds since the epoch that this query endsa at.
:rtype: tuple
:return: 2-element tuple - ((start_time, end_time, interval), list_of_metric_values). Graphite wants evenly-spaced metrics,
and None for any interval that doesn't have data. It infers the time for each update by the order and place of each
value provided.
This function returns the values being queried, in the format that the graphite-web app requires.
"""
def cache_query():
def cache_query_closure(query_dict):
reader.cache_time(10, query_dict)
return cache_query_closure
tags = conn.read_absolute([metric_name], start_time, end_time,
query_modifying_function=cache_query(),
only_read_tags=True)
interval_seconds = _lowest_resolution_retention(tags, metric_name)
def modify_query():
def modify_query_closure(query_dict):
group_by = reader.default_group_by()
group_by["range_size"] = { "value" : interval_seconds, "unit" : "seconds"}
aggregator = reader.default_aggregator()
aggregator["sampling"] = group_by["range_size"]
reader.group_by([group_by], query_dict["metrics"][0])
reader.aggregation([aggregator], query_dict["metrics"][0])
return modify_query_closure
# now that we've gotten the tags and have set the retention time, get data
content = conn.read_absolute([metric_name], start_time, end_time,
query_modifying_function=modify_query())
return_list = list()
if len(content['queries'][0]['results']) > 0:
# by_interval_dict = dict([(v[1], v[0]) for v in content["queries"][0]["results"][0]["values"] ])
value_deque = deque(content["queries"][0]["results"][0]["values"])
slots = list()
for slot_begin in range(start_time, end_time, interval_seconds):
slot_buffer = list()
slot_end = slot_begin + interval_seconds
slots.append((slot_begin, slot_end))
try:
if slot_end < value_deque[0][0]: # we haven't caught up with the beginning of the deque
return_list.append(None)
continue
if slot_begin > value_deque[-1][0]: # We have nothing more of value
return_list.append(None)
continue
if len(value_deque) == 0:
return_list.append(None)
continue
while slot_begin <= value_deque[0][0] < slot_end:
slot_buffer.append(value_deque.popleft()[1])
except IndexError:
return_list.append(None)
if len(slot_buffer) < 1:
return_list.append(None)
else:
return_list.append(sum(slot_buffer)/len(slot_buffer)) # take the average of the points for this slot
else:
return_list = [ None for n in range(start_time, end_time, interval_seconds)]
return ((start_time, end_time, interval_seconds), return_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment