Skip to content

Instantly share code, notes, and snippets.

@the-gigi
Last active August 29, 2015 14:08
Show Gist options
  • Save the-gigi/63a38b6eac7880c9a4f9 to your computer and use it in GitHub Desktop.
Save the-gigi/63a38b6eac7880c9a4f9 to your computer and use it in GitHub Desktop.
Little program to feed data to Grok on AWS
from argparse import ArgumentParser
from collections import defaultdict
import json
import os
import socket
import urllib
import dateutil.parser
from aclima.common.datetime_util import (utc_time,
datetime_to_epoch_milliseconds,
epoch_milliseconds_to_datetime)
from pprint import pprint as pp
import time
from aclima.datahub.api.client import Client
GROK_PORT = 2003
def feed(ip_address, data, metric, delay=0):
"""Send readings to a custom metric on Grok in Graphite format
Each reading should have unix timestamp (seconds from epoch) or None
The custom metric name should be setup on Grok.
See the following link for instructions how to see your data:
http://numenta.com/assets/pdf/grok/resources/1.3/Grok-Custom-Metrics.pdf
:param ip_address: ip address of the Grok instance
:param data: iterable of (value, timestamp)
:param metric: the name of the custom metric
:param delay: wait time in seconds between consecutive messages
:return:
"""
s = socket.socket()
s.connect((ip_address, GROK_PORT))
for i, (value, ts) in enumerate(data):
# If no timestamp provided use current time
if ts is None:
ts = time.time()
msg = '{} {} {}\n'.format(metric, value, ts)
s.send(msg)
print i, msg,
time.sleep(delay)
def get_data(username,
password,
modality,
start,
end,
install_site_slugs):
assert hasattr(install_site_slugs, '__iter__')
assert isinstance(start, int)
assert isinstance(end, int)
cli = Client()
cli.login(username, password)
# https://api.aclima.io/v2/dataquery?node_location=us-mtv-1842-1-amazon-theater&sensor_type=co2&start_time=1415815200000&end_time=1415818800000&page=1&downsample=false
base_url = 'https://api.aclima.io/v2/dataquery?'
params = dict(sensor_type=modality,
node_location=','.join(install_site_slugs),
start_time=start,
end_time=end,
interval=60)
params_qs = urllib.urlencode(params)
url = base_url + params_qs
response = cli._get(url)
assert response.ok
data = json.loads(response.content)
return data
def options():
def ts2datetime(ts):
dt = utc_time(dateutil.parser.parse(ts))
return datetime_to_epoch_milliseconds(dt)
p = ArgumentParser(description=__doc__)
p.add_argument('--username',
default=os.environ.get('ACLIMA_API_USER', ''))
p.add_argument('--password',
default=os.environ.get('ACLIMA_API_PASSWORD', ''))
p.add_argument('--grok-ip-address',
help='the Grok AWS instance IP address',
default='54.200.41.228')
p.add_argument('--modality', required=True)
p.add_argument('--start', required=True, type=ts2datetime)
p.add_argument('--end', required=True, type=ts2datetime)
p.add_argument('--install-site-slugs',
type=lambda x: [s.strip() for s in x.split(',')],
help='comma-separated list',
default=())
return p.parse_args()
def prepare_grok_data(sensor_data, start, end):
"""Extract the requested field from the query results and stretch time
The time is stretched back, so the last record is in the present
:param dataset: a flat query results dataset
:param start: start time of query
:param end: end time of query
:return: processed data ready to feed to grok
"""
fake_start = time.time() - (end - start).seconds
result = defaultdict(list)
for s in sensor_data['sensors']:
metric = '{}_{}'.format(s['sensor_type'], s['sensor_mac'])
for ts, value in s['data']:
offset = (epoch_milliseconds_to_datetime(ts) - start).seconds
ts = fake_start + offset
result[metric].append((value, ts))
return result.items()
def main():
print 'Starting.'
o = options()
print 'Parsed argument successfully.'
pp(o)
print 'Getting data... can take awhile'
data = get_data(o.username,
o.password,
o.modality,
o.start,
o.end,
o.install_site_slugs)
print 'Got data: {} records'.format(len(data))
start = epoch_milliseconds_to_datetime(o.start)
end = epoch_milliseconds_to_datetime(o.end)
print 'start: {}, end: {}'. format(start, end)
print 'Preparing Grok data...'
grok_data = prepare_grok_data(data,
start,
end,
)
print 'Prepared {} records'.format(len(grok_data))
for metric_name, stream in grok_data:
feed(o.grok_ip_address, stream, metric_name)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment