Skip to content

Instantly share code, notes, and snippets.

@rcarmo
Last active November 21, 2018 12:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rcarmo/28d3b50b3693a03b82c828f50c60d21c to your computer and use it in GitHub Desktop.
Save rcarmo/28d3b50b3693a03b82c828f50c60d21c to your computer and use it in GitHub Desktop.
EnviroPHAT events to Azure IoT Hub, with Stream Analytics output to Azure Data Lake and PowerBI
#!/usr/bin/env python
# Minimalist environmental data logger to Azure IoT Hub
# Rui Carmo, November 2018
from envirophat import light, motion, weather, leds
from base64 import b64encode, b64decode
from hashlib import sha256
from time import time, sleep
from urllib import quote_plus, urlencode
from hmac import HMAC
from requests import Session
from json import dumps
from os import environ
from subprocess import call
from traceback import format_exc
from logging import getLogger, basicConfig, ERROR, DEBUG, INFO
basicConfig(level=DEBUG, format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s", filename="/home/pi/error.log")
log = getLogger(__name__)
CONNECTION_STRING = environ["CONNECTION_STRING"] # fail immediately if not present
# NOTE: This assumes specific field ordering in the connection string
HOST_NAME, DEVICE_NAME, DEVICE_KEY = [part[part.index('=') + 1:] for part in CONNECTION_STRING.split(";")]
EVENT_FORMAT = environ.get("EVENT_FORMAT", "flat")
EVENT_INTERVAL = int(environ.get("EVENT_INTERVAL", "1"))
def generate_sas_token(uri, key, policy_name=None, expiry=3600):
ttl = time() + expiry
sign_key = "%s\n%d" % ((quote_plus(uri)), int(ttl))
signature = b64encode(HMAC(b64decode(key), sign_key, sha256).digest())
rawtoken = {
'sr' : uri,
'sig': signature,
'se' : str(int(ttl))
}
if policy_name is not None:
rawtoken['skn'] = policy_name
return 'SharedAccessSignature ' + urlencode(rawtoken)
def collect_environment_data():
lux = light.light()
leds.on()
rgb = ",".join(map(str,light.rgb()))
leds.off()
return {
"lux": lux,
"rgb": rgb,
"accelerometer": ",".join(map(str,motion.accelerometer())),
"heading": motion.heading(), # Uncalibrated
"temperature": weather.temperature(), # Celsius
"pressure": weather.pressure() # hectoPascals
}
def collect_wifi_data():
with open("/proc/net/wireless", "r") as stats:
for line in stats.readlines():
if "wlan0" in line:
_, _, link, level, noise, _ = filter(len,line.strip().split(" "))[:6]
return {
"link": float(link), # link quality
"level": float(level), # signal level
"noise": float(noise) # noise level
}
def collect_and_merge_data():
environ = collect_environment_data()
wifi = collect_wifi_data()
if wifi:
environ.update(wifi)
values = []
# Higher-level abstractions for nested JSON
json_fields = ["lux", "rgb", "accelerometer", "heading", "temperature", "pressure", "link", "level", "noise"]
# Lower-level counters for CSV and flat JSON
csv_fields = ["lux", "r","g","b", "x", "y", "z", "heading", "temperature", "pressure", "link", "level", "noise"]
for field in json_fields:
values.append(environ.get(field,""))
csv_values = ",".join(map(str,values)).split(",")
return {
"json": dumps(environ),
"csv": ",".join(csv_fields) + "\n" + ",".join(csv_values),
"flat": dumps({x: csv_values[csv_fields.index(x)] for x in csv_fields})
}
def main():
log.debug("Run started.")
while(True):
try:
with Session() as s:
while(True):
data = collect_and_merge_data()[EVENT_FORMAT]
uri = "%s/devices/%s/messages/events?api-version=%s" % (HOST_NAME, DEVICE_NAME, "2016-11-14")
headers = {'Authorization': generate_sas_token(uri, DEVICE_KEY)}
if EVENT_FORMAT in ['json', 'flat']:
headers['Content-Type'] = "application/json"
res = s.post("https://" + uri,
headers = headers,
timeout = 5,
data = data)
log.debug("Data: %s" % data)
log.debug("Result: '%s', %d" % (res.text, res.status_code))
sleep(EVENT_INTERVAL)
log.error("Session failed.")
except KeyboardInterrupt:
# Exit in an orderly fashion
leds.off()
log.error("Interrupted.")
break
except Exception as e:
log.error(e)
# In case of network failure and suchlike
leds.on()
log.error(format_exc())
call(["sudo", "ifdown", "--force", "wlan0"])
sleep(1)
call(["sudo", "ifup", "wlan0"])
log.error("Interface reset.")
sleep(5)
leds.off()
pass
log.error("Exited.")
if __name__ == '__main__':
main()
-- System.TimeStamp AS time, -- naive timestamp
-- EventProcessedUtcTime AS time, -- processed by Stream Analytics
-- EventEnqueuedUtcTime AS time, -- from Event Hubs
-- IoTHub.EnqueuedTime AS time -- internal IoT Hub timestamp
SELECT
EventEnqueuedUtcTime AS time,
lux, r, g, b, x, y, z, heading, temperature, pressure, link, level, noise
INTO
bag
FROM
river
SELECT
EventEnqueuedUtcTime AS time,
MAX(lux) as lux, AVG(r) as r, AVG(g) as g, AVG(b) as b, MAX(x) as x, MAX(y) as y, MAX(z) as z,
AVG(heading) as heading, AVG(temperature) as temperature, AVG(pressure) as pressure, MIN(link) as link, MIN(level) as level, MAX(noise) as noise,
COUNT(*) AS[eventCount]
INTO
pbi
FROM
river
GROUP BY
EventEnqueuedUtcTime,
lux, r, g, b, x, y, z, heading, temperature, pressure, link, level, noise,
TumblingWindow(second, 5)
@environment =
EXTRACT
time DateTime,
lux double,
r int,
g int,
b int,
x double,
y double,
z double,
heading double,
temperature double,
pressure double,
link double,
level double,
noise double
FROM @in
USING Extractors.Csv(skipFirstNRows:1);
OUTPUT @environment
TO @out
USING Outputters.Csv(outputHeader:false);
@environment =
EXTRACT
time DateTime,
lux double,
r int,
g int,
b int,
x double,
y double,
z double,
heading double,
temperature double,
pressure double,
link double,
level double,
noise double
FROM "environment/csv/2017/04/{*}.csv"
USING Extractors.Csv(skipFirstNRows:1);
@light =
SELECT time, lux, r, g, b, temperature
FROM @environment;
@movement =
SELECT time, x, y, z, heading
FROM @environment;
@physics =
SELECT time, heading, temperature, pressure, link, level
FROM @environment;
OUTPUT @light
TO "/consolidated/light.csv"
USING Outputters.Csv(outputHeader:true);
OUTPUT @movement
TO "/consolidated/movement.csv"
USING Outputters.Csv(outputHeader:true);
OUTPUT @physics
TO "/consolidated/physics.csv"
USING Outputters.Csv(outputHeader:true);
OUTPUT @environment
TO "/consolidated/all.csv"
USING Outputters.Csv(outputHeader:true);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment