Last active
November 8, 2022 12:54
-
-
Save yanfenglee/a1a6247ee6011d44178d736b7fa1ceb9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib.pyplot as plt | |
from matplotlib.animation import FuncAnimation | |
import polars as pl | |
import matplotlib as mpl | |
import requests | |
from datetime import datetime, timedelta | |
import enum | |
import time | |
from playsound import playsound | |
import threading | |
################## config begin ###################### | |
# 账号token | |
TOKEN = '<your token>' | |
# 快速超出范围报警 | |
LEVEL1 = (5, 8) | |
# 超出范围报警 | |
LEVEL2 = (4.2, 9) | |
# 持续收不到信号报警时间(分钟) | |
NO_SIGNAL_TIME = 18 | |
# 监控频率 120 秒一次 | |
MONITOR_FREQ = 1/120 | |
# 延迟报警,命令输入 0,1,2,3 分别对应的延迟时间 | |
ALERT_DELAY = [0, 30, 60, 90] | |
################## config end ###################### | |
def get_bgs(hour=3) -> pl.DataFrame: | |
res = requests.get('https://gluc.cn/api/v1/entries.json', headers={ | |
'token': TOKEN}, params={'rr': 9999999999999999, 'count': int(hour*60/5)}) | |
bgs = res.json() | |
data = {'ts':[],'bg': []} | |
for bg in bgs: | |
if bg['sgv']: | |
data['ts'].append(datetime.fromtimestamp(bg['date']/1000)) | |
data['bg'].append(round(bg['sgv']/18, 1)) | |
df = pl.DataFrame(data) | |
return df | |
def animate(i): | |
df = get_bgs(hour=3) | |
df2 = df.sort('ts').groupby_rolling(index_column='ts', period='30m', offset='-15m').agg(pl.col('bg').mean().alias('mean_bg')) | |
plt.clf() | |
plt.title("mumu's glucose") | |
plt.ylabel('mmol/L') | |
plt.scatter(df['ts'], df['bg']) | |
plt.plot(df2['ts'], df2['mean_bg'], color='r') | |
plt.grid() | |
plt.tight_layout() | |
plt.draw_all(True) | |
def draw_bgs(): | |
animate(0) | |
anim = FuncAnimation(fig=plt.gcf(), func=animate, frames=range(1), interval=1000*60, repeat=True) | |
plt.show() | |
plt.close() | |
class AlertType(enum.Enum): | |
LOW = 1 # 血糖低 | |
HIGH = 2 # 血糖高 | |
ASCENDING = 3 # 血糖快速上升中 | |
DESCENDING = 4 # 血糖快速下降中 | |
NOVALUE = 5 # 无信号 | |
UNKNOWN = 6 # 其他异常如网络问题 | |
def alert(type: AlertType): | |
DIR = 'sounds/' | |
musics = { | |
AlertType.LOW: 's1.mp3', | |
AlertType.HIGH: 's2.mp3', | |
AlertType.DESCENDING: 's1.mp3', | |
AlertType.ASCENDING: 's2.mp3', | |
AlertType.NOVALUE: 's3.mp3', | |
AlertType.UNKNOWN: 's4.mp3', | |
} | |
timenow = datetime.now().isoformat(sep=' ',timespec='seconds') | |
print(f'{timenow} alert call: {type}\n') | |
threading.Thread(target=lambda: playsound(DIR+musics[type])).start() | |
def check(df: pl.DataFrame): | |
now = datetime.now() | |
if df[0, 'bg'] > LEVEL1[1] and df[0, 'bg'] - df[3, 'bg'] > 1.0: | |
alert(AlertType.ASCENDING) | |
if df[0, 'bg'] < LEVEL1[0] and df[0, 'bg'] - df[3, 'bg'] < -1.0: | |
alert(AlertType.DESCENDING) | |
if (df[:1]['bg'] > LEVEL2[1]).all(): | |
alert(AlertType.HIGH) | |
if (df[:1]['bg'] < LEVEL2[0]).all(): | |
alert(AlertType.LOW) | |
if (now - df[0, 'ts'] > timedelta(minutes=NO_SIGNAL_TIME)): | |
alert(AlertType.NOVALUE) | |
delay_alert_minutes = 0 | |
def monitor_func(): | |
last_monitor_timestamp = 0 | |
global delay_alert_minutes | |
while True: | |
dt = datetime.now().timestamp() - last_monitor_timestamp | |
if dt < 1/MONITOR_FREQ+delay_alert_minutes*60: | |
continue | |
delay_alert_minutes = 0 | |
try: | |
df = get_bgs() | |
check(df) | |
except Exception as ex: | |
print(ex) | |
alert(AlertType.UNKNOWN) | |
last_monitor_timestamp = datetime.now().timestamp() | |
time.sleep(1) | |
def get_cmd_idx(): | |
prompt = '\n> 输入 4 显示最近血糖值图表\n> 输入 0,1,2,3, 分别延迟报警 0,30,60,90分钟\n' | |
cmd_index = input(prompt) | |
if cmd_index == 'q': | |
exit() | |
try: | |
return int(cmd_index) | |
except Exception as ex: | |
return None | |
def main(): | |
threading.Thread(target=monitor_func, daemon=True).start() | |
while True: | |
cmd_index = get_cmd_idx() | |
global delay_alert_minutes | |
if cmd_index in [0, 1, 2, 3]: | |
delay_alert_minutes = ALERT_DELAY[cmd_index] | |
print('===================================') | |
timenow = datetime.now().isoformat(sep=' ',timespec='seconds') | |
print(f'{timenow} 设置延迟报警时间: {delay_alert_minutes} 分钟') | |
print('===================================') | |
elif cmd_index == 4: | |
draw_bgs() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment