Create a gist now

Instantly share code, notes, and snippets.

@rcarmo /0-monitor.py
Last active Oct 28, 2017

What would you like to do?
EnviroPHAT events to Azure IoT Hub, with Stream Analytics output to Azure Data Lake and PowerBI
#!/usr/bin/env python
# Minimalist environmental data logger to Azure IoT Hub
# Rui Carmo, April 2017
from envirophat import light, motion, weather, leds
from base64 import b64encode, b64decode
from hashlib import sha256
from time import time, sleep
from urllib import quote_plus, urlencode
from hmac import HMAC
from requests import Session
from json import dumps
from os import environ
from subprocess import call
from traceback import format_exc
from logging import getLogger, basicConfig, ERROR, DEBUG, INFO
basicConfig(level=ERROR, format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s", filename="/home/pi/error.log")
log = getLogger(__name__)
CONNECTION_STRING = environ["CONNECTION_STRING"] # fail immediately if not present
# NOTE: This assumes specific field ordering in the connection string
HOST_NAME, DEVICE_NAME, DEVICE_KEY = [part[part.index('=') + 1:] for part in CONNECTION_STRING.split(";")]
EVENT_FORMAT = environ.get("EVENT_FORMAT", "csv")
EVENT_INTERVAL = int(environ.get("EVENT_INTERVAL", "1"))
def generate_sas_token(uri, key, policy_name=None, expiry=3600):
ttl = time() + expiry
sign_key = "%s\n%d" % ((quote_plus(uri)), int(ttl))
signature = b64encode(HMAC(b64decode(key), sign_key, sha256).digest())
rawtoken = {
'sr' : uri,
'sig': signature,
'se' : str(int(ttl))
}
if policy_name is not None:
rawtoken['skn'] = policy_name
return 'SharedAccessSignature ' + urlencode(rawtoken)
def collect_environment_data():
lux = light.light()
leds.on()
rgb = ",".join(map(str,light.rgb()))
leds.off()
return {
"lux": lux,
"rgb": rgb,
"accelerometer": ",".join(map(str,motion.accelerometer())),
"heading": motion.heading(), # Uncalibrated
"temperature": weather.temperature(), # Celsius
"pressure": weather.pressure() # hectoPascals
}
def collect_wifi_data():
with open("/proc/net/wireless", "r") as stats:
for line in stats.readlines():
if "wlan0" in line:
_, _, link, level, noise, _ = filter(len,line.strip().split(" "))[:6]
return {
"link": float(link), # link quality
"level": float(level), # signal level
"noise": float(noise) # noise level
}
def collect_and_merge_data():
environ = collect_environment_data()
wifi = collect_wifi_data()
if wifi:
environ.update(wifi)
values = []
# Higher-level abstractions for JSON
json_fields = ["lux", "rgb", "accelerometer", "heading", "temperature", "pressure", "link", "level", "noise"]
# Lower-level counters for CSV
csv_fields = ["lux", "r","g","b", "x", "y", "z", "heading", "temperature", "pressure", "link", "level", "noise"]
for field in json_fields:
values.append(environ.get(field,""))
return {
"json": dumps(environ),
"csv": ",".join(csv_fields) + "\n" + ",".join(map(str,values))
}
def main():
log.error("Run started.")
while(True):
try:
with Session() as s:
while(True):
data = collect_and_merge_data()[EVENT_FORMAT]
uri = "%s/devices/%s/messages/events?api-version=%s" % (HOST_NAME, DEVICE_NAME, "2016-11-14")
res = s.post("https://" + uri,
headers = {'Authorization': generate_sas_token(uri, DEVICE_KEY)},
timeout = 5,
data = data)
log.debug("Data: %s" % data)
log.debug("Result: '%s', %d" % (res.text, res.status_code))
sleep(EVENT_INTERVAL)
log.error("Session failed.")
except KeyboardInterrupt:
# Exit in an orderly fashion
leds.off()
log.error("Interrupted.")
break
except:
# In case of network failure and suchlike
log.error(format_exc())
call(["sudo", "ifdown", "--force", "wlan0"])
sleep(1)
call(["sudo", "ifup", "wlan0"])
log.error("Interface reset.")
sleep(5)
pass
log.error("Exited.")
if __name__ == '__main__':
main()
-- System.TimeStamp AS time, -- naive timestamp
-- EventProcessedUtcTime AS time, -- processed by Stream Analytics
-- EventEnqueuedUtcTime AS time, -- from Event Hubs
-- IoTHub.EnqueuedTime AS time -- internal IoT Hub timestamp
SELECT
EventEnqueuedUtcTime AS time,
lux, r, g, b, x, y, z, heading, temperature, pressure, link, level, noise
INTO
bag
FROM
river
SELECT
EventEnqueuedUtcTime AS time,
MAX(lux) as lux, AVG(r) as r, AVG(g) as g, AVG(b) as b, MAX(x) as x, MAX(y) as y, MAX(z) as z,
AVG(heading) as heading, AVG(temperature) as temperature, AVG(pressure) as pressure, MIN(link) as link, MIN(level) as level, MAX(noise) as noise,
COUNT(*) AS[eventCount]
INTO
pbi
FROM
river
GROUP BY
EventEnqueuedUtcTime,
lux, r, g, b, x, y, z, heading, temperature, pressure, link, level, noise,
TumblingWindow(second, 5)
@environment =
EXTRACT
time DateTime,
lux double,
r int,
g int,
b int,
x double,
y double,
z double,
heading double,
temperature double,
pressure double,
link double,
level double,
noise double
FROM @in
USING Extractors.Csv(skipFirstNRows:1);
OUTPUT @environment
TO @out
USING Outputters.Csv(outputHeader:false);
@environment =
EXTRACT
time DateTime,
lux double,
r int,
g int,
b int,
x double,
y double,
z double,
heading double,
temperature double,
pressure double,
link double,
level double,
noise double
FROM "environment/csv/2017/04/{*}.csv"
USING Extractors.Csv(skipFirstNRows:1);
@light =
SELECT time, lux, r, g, b, temperature
FROM @environment;
@movement =
SELECT time, x, y, z, heading
FROM @environment;
@physics =
SELECT time, heading, temperature, pressure, link, level
FROM @environment;
OUTPUT @light
TO "/consolidated/light.csv"
USING Outputters.Csv(outputHeader:true);
OUTPUT @movement
TO "/consolidated/movement.csv"
USING Outputters.Csv(outputHeader:true);
OUTPUT @physics
TO "/consolidated/physics.csv"
USING Outputters.Csv(outputHeader:true);
OUTPUT @environment
TO "/consolidated/all.csv"
USING Outputters.Csv(outputHeader:true);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment