Last active
August 29, 2015 14:08
-
-
Save the-gigi/63a38b6eac7880c9a4f9 to your computer and use it in GitHub Desktop.
Little program to feed data to Grok on AWS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from argparse import ArgumentParser | |
from collections import defaultdict | |
import json | |
import os | |
import socket | |
import urllib | |
import dateutil.parser | |
from aclima.common.datetime_util import (utc_time, | |
datetime_to_epoch_milliseconds, | |
epoch_milliseconds_to_datetime) | |
from pprint import pprint as pp | |
import time | |
from aclima.datahub.api.client import Client | |
GROK_PORT = 2003 | |
def feed(ip_address, data, metric, delay=0): | |
"""Send readings to a custom metric on Grok in Graphite format | |
Each reading should have unix timestamp (seconds from epoch) or None | |
The custom metric name should be setup on Grok. | |
See the following link for instructions how to see your data: | |
http://numenta.com/assets/pdf/grok/resources/1.3/Grok-Custom-Metrics.pdf | |
:param ip_address: ip address of the Grok instance | |
:param data: iterable of (value, timestamp) | |
:param metric: the name of the custom metric | |
:param delay: wait time in seconds between consecutive messages | |
:return: | |
""" | |
s = socket.socket() | |
s.connect((ip_address, GROK_PORT)) | |
for i, (value, ts) in enumerate(data): | |
# If no timestamp provided use current time | |
if ts is None: | |
ts = time.time() | |
msg = '{} {} {}\n'.format(metric, value, ts) | |
s.send(msg) | |
print i, msg, | |
time.sleep(delay) | |
def get_data(username, | |
password, | |
modality, | |
start, | |
end, | |
install_site_slugs): | |
assert hasattr(install_site_slugs, '__iter__') | |
assert isinstance(start, int) | |
assert isinstance(end, int) | |
cli = Client() | |
cli.login(username, password) | |
# https://api.aclima.io/v2/dataquery?node_location=us-mtv-1842-1-amazon-theater&sensor_type=co2&start_time=1415815200000&end_time=1415818800000&page=1&downsample=false | |
base_url = 'https://api.aclima.io/v2/dataquery?' | |
params = dict(sensor_type=modality, | |
node_location=','.join(install_site_slugs), | |
start_time=start, | |
end_time=end, | |
interval=60) | |
params_qs = urllib.urlencode(params) | |
url = base_url + params_qs | |
response = cli._get(url) | |
assert response.ok | |
data = json.loads(response.content) | |
return data | |
def options(): | |
def ts2datetime(ts): | |
dt = utc_time(dateutil.parser.parse(ts)) | |
return datetime_to_epoch_milliseconds(dt) | |
p = ArgumentParser(description=__doc__) | |
p.add_argument('--username', | |
default=os.environ.get('ACLIMA_API_USER', '')) | |
p.add_argument('--password', | |
default=os.environ.get('ACLIMA_API_PASSWORD', '')) | |
p.add_argument('--grok-ip-address', | |
help='the Grok AWS instance IP address', | |
default='54.200.41.228') | |
p.add_argument('--modality', required=True) | |
p.add_argument('--start', required=True, type=ts2datetime) | |
p.add_argument('--end', required=True, type=ts2datetime) | |
p.add_argument('--install-site-slugs', | |
type=lambda x: [s.strip() for s in x.split(',')], | |
help='comma-separated list', | |
default=()) | |
return p.parse_args() | |
def prepare_grok_data(sensor_data, start, end): | |
"""Extract the requested field from the query results and stretch time | |
The time is stretched back, so the last record is in the present | |
:param dataset: a flat query results dataset | |
:param start: start time of query | |
:param end: end time of query | |
:return: processed data ready to feed to grok | |
""" | |
fake_start = time.time() - (end - start).seconds | |
result = defaultdict(list) | |
for s in sensor_data['sensors']: | |
metric = '{}_{}'.format(s['sensor_type'], s['sensor_mac']) | |
for ts, value in s['data']: | |
offset = (epoch_milliseconds_to_datetime(ts) - start).seconds | |
ts = fake_start + offset | |
result[metric].append((value, ts)) | |
return result.items() | |
def main(): | |
print 'Starting.' | |
o = options() | |
print 'Parsed argument successfully.' | |
pp(o) | |
print 'Getting data... can take awhile' | |
data = get_data(o.username, | |
o.password, | |
o.modality, | |
o.start, | |
o.end, | |
o.install_site_slugs) | |
print 'Got data: {} records'.format(len(data)) | |
start = epoch_milliseconds_to_datetime(o.start) | |
end = epoch_milliseconds_to_datetime(o.end) | |
print 'start: {}, end: {}'. format(start, end) | |
print 'Preparing Grok data...' | |
grok_data = prepare_grok_data(data, | |
start, | |
end, | |
) | |
print 'Prepared {} records'.format(len(grok_data)) | |
for metric_name, stream in grok_data: | |
feed(o.grok_ip_address, stream, metric_name) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment