Skip to content

Instantly share code, notes, and snippets.

@micktwomey
Last active June 25, 2018 17:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save micktwomey/3fe69397255e489e1e2b to your computer and use it in GitHub Desktop.
Save micktwomey/3fe69397255e489e1e2b to your computer and use it in GitHub Desktop.
Overcomplicated Raspberry Pi + Sense Hat + Firehose + Lambda + Hosted Graphite Thermometer
{
"DeliveryStreamDescription": {
"HasMoreDestinations": false,
"VersionId": "1",
"CreateTimestamp": 1445015966.474,
"DeliveryStreamARN": "arn:aws:firehose:...deliverystream/pi-sense-hat",
"DeliveryStreamStatus": "ACTIVE",
"DeliveryStreamName": "pi-sense-hat",
"Destinations": [
{
"DestinationId": "destinationId-000000000001",
"S3DestinationDescription": {
"RoleARN": "arn:aws:iam::...role/firehouse_pi_sense_hat",
"Prefix": "sense-hat/redpi/",
"BufferingHints": {
"IntervalInSeconds": 300,
"SizeInMBs": 5
},
"EncryptionConfiguration": {
"NoEncryptionConfig": "NoEncryption"
},
"CompressionFormat": "GZIP",
"BucketARN": "arn:aws:s3:::..."
}
}
]
}
}
"""Records readings from the sense hat
"""
import datetime
import json
import logging
import os
import time
import boto3
import sense_hat
def read_hat_sensors(hat):
readings = {
"datetime": datetime.datetime.utcnow().isoformat(),
"timestamp": time.time(),
"temperature": hat.temperature,
"pressure": hat.pressure,
"humidity": hat.humidity,
"compass": hat.get_compass(),
}
for name, fn in (
('accelerometer', hat.get_accelerometer),
('gyroscope', hat.get_gyroscope),
('orientation', hat.get_orientation),
):
for key, val in fn().items():
readings['{}.{}'.format(name, key)] = val
return readings
def write_reading(reading, firehose_client):
logging.info("Pushing to firehose")
try:
response = firehose_client.put_record(
DeliveryStreamName='pi-sense-hat',
Record={
'Data': json.dumps(reading)
}
)
logging.info(response)
except Exception:
logging.exception("Problem pushing to firehose")
logging.info("Writing to disk")
filename = "hat_readings/reading_{now:%Y-%m-%d-%H}.json".format(now=datetime.datetime.utcnow())
try:
os.mkdir(os.path.dirname(filename))
except OSError:
pass
logging.info(filename)
try:
with open(filename) as fp:
readings = json.load(fp)
except Exception:
readings = []
readings.append(reading)
with open(filename, "w") as fp:
json.dump(readings, fp)
logging.info("Written to disk")
def main():
logging.basicConfig(level=logging.DEBUG)
firehose_client = boto3.client('firehose', region_name="us-east-1")
hat = sense_hat.SenseHat()
hat.set_rotation(180)
while True:
reading = read_hat_sensors(hat)
logging.info(reading)
write_reading(reading, firehose_client)
time.sleep(30)
if __name__ == '__main__':
main()
{
"CodeSha256": "Alvk72dXY4jyeXnBnEjQ72h6vt3IE9z5ZeEWJXuwjiQ=",
"FunctionName": "process_s3_sensehat_record",
"CodeSize": 3596,
"MemorySize": 128,
"FunctionArn": "arn:aws:lambda:eu-west-1:...:function:process_s3_sensehat_record",
"Version": "$LATEST",
"Role": "arn:aws:iam::...:role/lambda_sense_hat",
"Timeout": 10,
"LastModified": "2015-10-20T12:38:06.687+0000",
"Handler": "lambda_process_s3_record.lambda_handler",
"Runtime": "python2.7",
"Description": "Pull S3 JSON records from sense hat and upload to hosted graphite"
}
import gzip
import json
import logging
import socket
import time
import boto3
import iso8601
logging.basicConfig(level=logging.DEBUG)
print('Loading function')
def send_metric(connection, metric, value, timestamp):
api_key = "<insert your key here>"
metric = "{}.{} {} {}\n".format(api_key, metric, value, int(timestamp)).encode('utf-8')
print("Sending: {!r}".format(metric))
connection.send(metric)
def parse_date_to_timestamp(dt):
return time.mktime(iso8601.parse_date(dt).timetuple())
def stream_json_objects(fp):
"""Allows for multiple json objects in a file, looks for starting { and matches to closing }"""
buffer = []
depth = 0
# for char in fp.read(1):
while True:
char = fp.read(1)
if not char:
break
buffer.append(char)
if char == '{':
depth += 1
if char == '}':
depth -= 1
assert depth >= 0, (depth, buffer)
if depth == 0 and len(buffer) > 1:
yield json.loads("".join(buffer))
buffer = []
assert not buffer, repr(buffer)
def lambda_handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
try:
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
print("CONTENT TYPE: " + response['ContentType'])
# gzip / zlib in python 2 doesn't play ball easily, just write the file out
with open('/tmp/tmp.gz', 'wb') as fp:
fp.write(response['Body'].read())
with gzip.open('/tmp/tmp.gz') as fp:
conn = socket.create_connection(("<your tcp endpoint>", 2003))
for record in stream_json_objects(fp):
when = parse_date_to_timestamp(record['datetime'])
for key, value in record.items():
if value in ('timestamp', 'datetime'):
continue
send_metric(conn, 'sensehat.redpi.{}'.format(key), value, when)
conn.close()
except Exception:
logging.exception("Uh oh")
raise
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("bucket")
parser.add_argument("key")
args = parser.parse_args()
result = lambda_handler({
'Records': [
{
's3': {
'bucket': {
'name': args.bucket
},
'object': {
'key': args.key
}
}
}
]
}, None)
print(result)
ZIPFILE=lambda_process_s3_records.zip
SOURCES=iso8601.py lambda_process_s3_record.py
all: $(ZIPFILE)
$(ZIPFILE): $(SOURCES)
rm -f $@
zip $@ $^
.PHONY: upload
upload: $(ZIPFILE)
aws lambda update-function-code --function-name process_s3_sensehat_record --zip-file fileb://$<
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment