Created
January 2, 2014 21:48
-
-
Save jroyalty/8227589 to your computer and use it in GitHub Desktop.
Snippet from that takes time-series data without a fixed interval and returns it bucketed by a configurable size (interval_seconds). This is abstracted from: https://github.com/pcn/pyKairosDB/blob/master/pyKairosDB/graphite.py ... This would be easily adapted for use by an OpenTSDB reader for Graphite.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read_absolute(conn, metric_name, start_time, end_time): | |
""" | |
:type conn: pyKairosDB.KairosDBConnection | |
:param conn: The connection to KairosDB | |
:type metric_name: string | |
:param metric_name: The name of the metric to query (graphite does one at a time, though KairosDB can do more) | |
:type start_time: float | |
:param start_time: The float representing the number of seconds since the epoch that this query starts at. | |
:type end_time: float | |
:param end_time: The float representing the number of seconds since the epoch that this query endsa at. | |
:rtype: tuple | |
:return: 2-element tuple - ((start_time, end_time, interval), list_of_metric_values). Graphite wants evenly-spaced metrics, | |
and None for any interval that doesn't have data. It infers the time for each update by the order and place of each | |
value provided. | |
This function returns the values being queried, in the format that the graphite-web app requires. | |
""" | |
def cache_query(): | |
def cache_query_closure(query_dict): | |
reader.cache_time(10, query_dict) | |
return cache_query_closure | |
tags = conn.read_absolute([metric_name], start_time, end_time, | |
query_modifying_function=cache_query(), | |
only_read_tags=True) | |
interval_seconds = _lowest_resolution_retention(tags, metric_name) | |
def modify_query(): | |
def modify_query_closure(query_dict): | |
group_by = reader.default_group_by() | |
group_by["range_size"] = { "value" : interval_seconds, "unit" : "seconds"} | |
aggregator = reader.default_aggregator() | |
aggregator["sampling"] = group_by["range_size"] | |
reader.group_by([group_by], query_dict["metrics"][0]) | |
reader.aggregation([aggregator], query_dict["metrics"][0]) | |
return modify_query_closure | |
# now that we've gotten the tags and have set the retention time, get data | |
content = conn.read_absolute([metric_name], start_time, end_time, | |
query_modifying_function=modify_query()) | |
return_list = list() | |
if len(content['queries'][0]['results']) > 0: | |
# by_interval_dict = dict([(v[1], v[0]) for v in content["queries"][0]["results"][0]["values"] ]) | |
value_deque = deque(content["queries"][0]["results"][0]["values"]) | |
slots = list() | |
for slot_begin in range(start_time, end_time, interval_seconds): | |
slot_buffer = list() | |
slot_end = slot_begin + interval_seconds | |
slots.append((slot_begin, slot_end)) | |
try: | |
if slot_end < value_deque[0][0]: # we haven't caught up with the beginning of the deque | |
return_list.append(None) | |
continue | |
if slot_begin > value_deque[-1][0]: # We have nothing more of value | |
return_list.append(None) | |
continue | |
if len(value_deque) == 0: | |
return_list.append(None) | |
continue | |
while slot_begin <= value_deque[0][0] < slot_end: | |
slot_buffer.append(value_deque.popleft()[1]) | |
except IndexError: | |
return_list.append(None) | |
if len(slot_buffer) < 1: | |
return_list.append(None) | |
else: | |
return_list.append(sum(slot_buffer)/len(slot_buffer)) # take the average of the points for this slot | |
else: | |
return_list = [ None for n in range(start_time, end_time, interval_seconds)] | |
return ((start_time, end_time, interval_seconds), return_list) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment