Last active
April 24, 2017 20:36
-
-
Save mgranberry/c1c0ba83915e904114cc70df12c0fc66 to your computer and use it in GitHub Desktop.
socketio blood glucose fetch for openaps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import json | |
import signal | |
import subprocess | |
import sys | |
from datetime import datetime | |
from os import environ, utime | |
from threading import Timer | |
from time import time | |
from socketIO_client import BaseNamespace, SocketIO | |
from watchdog.events import FileSystemEventHandler | |
from watchdog.observers import Observer | |
NIGHTSCOUT_HOST = environ['NIGHTSCOUT_HOST'] | |
API_SECRET = environ['API_SECRET'] | |
MILLIS_PER_SECOND = 1000. | |
wait_count = 0 | |
# from https://gist.github.com/walkermatt/2871026 | |
def debounce(wait): | |
""" Decorator that will postpone a functions | |
execution until after wait seconds | |
have elapsed since the last time it was invoked. """ | |
def decorator(fn): | |
def debounced(*args, **kwargs): | |
def call_it(): | |
fn(*args, **kwargs) | |
try: | |
debounced.t.cancel() | |
except AttributeError: | |
pass | |
debounced.t = Timer(wait, call_it) | |
debounced.t.start() | |
return debounced | |
return decorator | |
def timestamp_hours_ago(hours): | |
return time() - hours * 60 * 60 * MILLIS_PER_SECOND | |
def write_glucose_json(entries): | |
print("writing monitor/glucose.json") | |
if entries: | |
entry_str = json.dumps(entries) | |
latest_time = entries[0]['date'] / MILLIS_PER_SECOND | |
with open('monitor/glucose.json', 'w') as glucose_file: | |
glucose_file.write(entry_str) | |
utime('monitor/glucose.json', (latest_time, latest_time)) | |
print("Glucose data updated with timestamp {}".format( | |
datetime.fromtimestamp(latest_time).isoformat())) | |
def write_treatment_json(filename, treatments): | |
if treatments: | |
treatment_str = json.dumps(treatments) | |
latest_time = treatments[0]['mills'] / MILLIS_PER_SECOND | |
with open(filename, 'w') as treatment_file: | |
treatment_file.write(treatment_str) | |
utime(filename, (latest_time, latest_time)) | |
print("Treatment ({}) data updated with timestamp {}".format( | |
filename, | |
datetime.fromtimestamp(latest_time).isoformat() | |
)) | |
class MonitorEventHandler(FileSystemEventHandler): | |
def __init__(self, io, treatment_map): | |
self.io = io | |
self.treatment_map = treatment_map | |
super(MonitorEventHandler, self).__init__() | |
def update_battery_status(self): | |
"""sudo ~/src/EdisonVoltage/voltage json batteryVoltage battery > | |
monitor/edison-battery.json""" | |
@debounce(10) | |
def invoke_reconcile_treatments(self): | |
print("invoke_reconcile_treatments") | |
formatted_treatments = subprocess.check_output([ | |
"mm-format-ns-treatments", | |
"monitor/pumphistory-zoned.json", | |
"settings/model.json"]) | |
treatments_json = json.loads(formatted_treatments) | |
for treatment in treatments_json: | |
treatment_key = (treatment['eventType'], treatment['timestamp']) | |
if treatment_key not in self.treatment_map: | |
print(treatment_key, "being added.") | |
for key, value in treatment.items(): | |
if isinstance(value, str): | |
if value.isdigit(): | |
treatment[key] = int(value) | |
else: | |
try: | |
treatment[key] = float(value) | |
except ValueError: | |
pass | |
if treatment.get("duration") is not None: | |
treatment["duration"] = int(treatment["duration"]) | |
self.io.emit( | |
'dbAdd', {"collection": "treatments", "data": treatment}) | |
print(treatment_key, "added.") | |
@debounce(25) | |
def invoke_meal_json(self): | |
print("invoke_meal_json") | |
subprocess.call(["openaps", "report", "invoke", "monitor/meal.json"]) | |
@debounce(15) | |
def invoke_settings_profile_json(self): | |
print("invoke_settings_profile_json") | |
subprocess.call(['openaps', 'report', 'invoke', | |
'settings/profile.json']) | |
@debounce(10) | |
def invoke_ns_status(self): | |
print("invoke_ns_status") | |
subprocess.call( | |
["openaps", "battery-status"]) | |
status = subprocess.check_output(["/usr/local/bin/ns-status", | |
"monitor/clock-zoned.json", | |
"monitor/iob.json", | |
"enact/suggested.json", | |
"enact/enacted.json", | |
"monitor/battery.json", | |
"monitor/reservoir.json", | |
"monitor/status.json", | |
"--uploader", | |
"monitor/edison-battery.json"]) | |
if "iob" in status: | |
self.io.emit('dbAdd', {"collection": "devicestatus", | |
"data": json.loads(status)}) | |
def on_modified(self, event): | |
if event.src_path == './.git': | |
return | |
if event.event_type == 'modified': | |
if event.src_path in {"./settings/profile.json", | |
"./monitor/carbhistory.json", | |
"./monitor/clock-zoned.json", | |
"./monitor/pumphistory-zoned.json", | |
"./settings/basal_profile.json", | |
"./monitor/glucose.json"}: | |
print("queueing invoke_meal_json because {}".format( | |
event.src_path)) | |
self.invoke_meal_json() | |
if event.src_path in {"./settings/bg_targets.json", | |
"./preferences.json", | |
"./settings/settings.json", | |
"./settings/basal_profile.json", | |
"./settings/carb_ratios.json", | |
"./settings/temptargets.json", | |
"./settings/model.json", | |
"./settings/autotune.json", | |
"./settings/insulin_sensitivities.json"}: | |
print("queueing invoke_settings_profile_json because {}".format( | |
event.src_path)) | |
self.invoke_settings_profile_json() | |
if event.src_path in {"./monitor/clock-zoned.json", | |
"./monitor/iob.json", | |
"./enact/suggested.json", | |
"./enact/enacted.json", | |
"./monitor/battery.json", | |
"./monitor/reservoir.json", | |
"./monitor/status.json"}: | |
print("queueing invoke_ns_status because {}".format( | |
event.src_path)) | |
self.invoke_ns_status() | |
if event.src_path == "./monitor/pumphistory-zoned.json": | |
print("queueing invoke_reconcile_treatments") | |
self.invoke_reconcile_treatments() | |
class WatchdogTimer(Exception): | |
def __init__(self, timeout): | |
self.timeout = timeout | |
def __enter__(self): | |
signal.signal(signal.SIGALRM, self.handler) | |
signal.alarm(self.timeout) | |
def __exit__(self, *args): | |
signal.alarm(0) | |
def handler(self, *args): | |
print("alarm fired") | |
raise self | |
class NightscoutNamespace(BaseNamespace): | |
def __init__(self, io, path): | |
print("__init__", io, path) | |
self.io = io | |
self.websocket_entries = [] | |
self.websocket_treatments = {} | |
self.websocket_temp_targets = [] | |
self.websocket_meal_carbs = [] | |
self.observer = None | |
super(NightscoutNamespace, self).__init__(io, path) | |
def on_error(self, data): | |
print("Error {}, restarting.".format(data)) | |
sys.stdout.flush() | |
sys.exit() | |
def on_close(self): | |
print("Close, restarting.") | |
sys.stdout.flush() | |
sys.exit() | |
def on_dataUpdate(self, data): | |
try: | |
global wait_count | |
wait_count = 0 | |
print("Incoming ({}) received at {}.".format( | |
data.keys(), datetime.now().isoformat() | |
)) | |
sys.stdout.flush() | |
timestamp_1d = timestamp_hours_ago(24) | |
if 'sgvs' in data: | |
for entry in data.get('sgvs', ()): | |
self.websocket_entries.append({ | |
'direction': entry.get('direction'), | |
'date': entry.get('mills'), | |
'dateString': datetime.fromtimestamp( | |
entry['mills'] / MILLIS_PER_SECOND, | |
tz=None).isoformat() + "-0500", | |
'sgv': entry.get('mgdl'), | |
'device': entry.get('device'), | |
'rssi': entry.get('rssi'), | |
'filtered': entry.get('filtered'), | |
'unfiltered': entry.get('unfiltered'), | |
'noise': entry.get('noise'), | |
'type': 'sgv', | |
'glucose': entry.get('mgdl') | |
}) | |
# websocket_entries.append(entry) | |
# print websocket_entries[-1] | |
websocket_entries = [ | |
entry for entry in self.websocket_entries if | |
entry.get('date') > timestamp_1d | |
] | |
websocket_entries.sort( | |
key=lambda k: k.get('date'), | |
reverse=True) | |
write_glucose_json(websocket_entries) | |
if 'treatments' in data: | |
for treatment in data.get('treatments', ()): | |
if treatment.get('action', None) and treatment['action'] \ | |
== 'remove': | |
print("removing treatment {}".format(treatment)) | |
removed_treatments = [ | |
removed_treatment for | |
removed_treatment in | |
self.websocket_treatments.values() if | |
removed_treatment['_id'] == | |
treatment['_id']] | |
for removed_treatment in removed_treatments: | |
try: | |
del self.websocket_treatments[( | |
removed_treatment['eventType'], | |
treatment.get( | |
'timestamp', | |
treatment.get( | |
'created_at', | |
None | |
) | |
))] | |
except KeyError: | |
pass | |
else: | |
self.websocket_treatments[(treatment['eventType'], | |
treatment.get( | |
'timestamp', | |
treatment.get( | |
'created_at', | |
None | |
)))] = treatment | |
self.handle_temp_targets() | |
self.handle_meal_carbs() | |
except Exception as e: | |
print(e, e.message, e.args) | |
import traceback | |
traceback.print_exc() | |
sys.exit() | |
def handle_meal_carbs(self): | |
new_websocket_meal_carbs = [ | |
treatment for treatment in self.websocket_treatments.values() if | |
(treatment.get('carbs') or treatment.get('insulin')) and | |
treatment.get('mills') >= timestamp_hours_ago(24) | |
] | |
new_websocket_meal_carbs.sort( | |
key=lambda t: t.get('mills'), | |
reverse=True) | |
if self.websocket_meal_carbs != new_websocket_meal_carbs: | |
self.websocket_meal_carbs = new_websocket_meal_carbs | |
write_treatment_json( | |
'monitor/carbhistory.json', | |
self.websocket_meal_carbs) | |
def handle_temp_targets(self): | |
new_websocket_temp_targets = [ | |
treatment for treatment in self.websocket_treatments.values() if | |
'Target' in treatment.get('eventType', '') and | |
treatment.get('mills') >= timestamp_hours_ago(6) | |
] | |
new_websocket_temp_targets.sort( | |
key=lambda t: t.get('mills'), | |
reverse=True) | |
if new_websocket_temp_targets != self.websocket_temp_targets: | |
self.websocket_temp_targets = new_websocket_temp_targets | |
write_treatment_json( | |
'settings/temptargets.json', | |
self.websocket_temp_targets) | |
def on_connect(self, *args): | |
self.io.emit('authorize', { | |
'client': 'openaps-ws', | |
'secret': API_SECRET, | |
'history': 24, | |
'from': timestamp_hours_ago(24), | |
'status': False | |
}) | |
path = '.' | |
self.observer = Observer() | |
self.observer.schedule( | |
MonitorEventHandler(self.io, self.websocket_treatments), | |
path, | |
recursive=True) | |
self.observer.start() | |
super(NightscoutNamespace, self).on_connect() | |
def on_disconnect(self): | |
global wait_count | |
wait_count += 8 | |
print( | |
"Disconnected, aborting at {}." | |
.format(datetime.now().isoformat())) | |
sys.stdout.flush() | |
if self.observer: | |
self.observer.stop() | |
sys.exit() | |
def main(): | |
with SocketIO( | |
NIGHTSCOUT_HOST, | |
Namespace=NightscoutNamespace, | |
wait_for_connection=False) as socketIO: | |
global wait_count | |
while wait_count < 10: | |
print("waiting at {}".format(datetime.now())) | |
sys.stdout.flush() | |
with WatchdogTimer(120): | |
socketIO.wait(60) | |
wait_count += 1 | |
sys.stdout.flush() | |
print( | |
"Waited too long for data. Exiting at {}.".format(datetime.now())) | |
if __name__ == '__main__': | |
main() |
Needs to use logging etc. This wasn't intended to be more than a proof-of-concept.
Upload events, status in response to data changes.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Edited to use environment variables for NIGHTSCOUT_HOST and API_SECRET.