|
#!/usr/bin/env python3 |
|
import sys |
|
import argparse |
|
import os |
|
import sqlite3 |
|
try: |
|
from lxml import etree as ET |
|
except ImportError: |
|
import xml.etree.ElementTree as ET |
|
from collections import defaultdict |
|
from datetime import datetime, timedelta |
|
|
|
SLEEP_TYPE = "HKCategoryTypeIdentifierSleepAnalysis" |
|
HRV_TYPE = "HKQuantityTypeIdentifierHeartRateVariabilitySDNN" |
|
RHR_TYPE = "HKQuantityTypeIdentifierRestingHeartRate" |
|
RESP_TYPE = "HKQuantityTypeIdentifierRespiratoryRate" |
|
VO2_TYPE = "HKQuantityTypeIdentifierVO2Max" |
|
STEPS_TYPE = "HKQuantityTypeIdentifierStepCount" |
|
ENERGY_TYPE = "HKQuantityTypeIdentifierActiveEnergyBurned" |
|
EXERCISE_TYPE = "HKQuantityTypeIdentifierAppleExerciseTime" |
|
STAND_TYPE = "HKCategoryTypeIdentifierAppleStandHour" |
|
QUANTITY_TABLES = { |
|
HRV_TYPE: "hrv", RHR_TYPE: "rhr", RESP_TYPE: "resp", VO2_TYPE: "vo2", |
|
STEPS_TYPE: "steps", ENERGY_TYPE: "energy", EXERCISE_TYPE: "exercise", |
|
} |
|
MIN_REM_CYCLE_MIN = 3 # minimum total REM to count as a cycle boundary |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("export", metavar="export.xml") |
|
parser.add_argument("--from", dest="date_from", metavar="DATE") |
|
parser.add_argument("--to", dest="date_to", metavar="DATE") |
|
parser.add_argument("--source", help="Filter by source name (e.g. \"Vlad's Apple Watch\")") |
|
args = parser.parse_args() |
|
|
|
records_by_date, metrics = load_all_sleep_records(args.export, args.source) |
|
|
|
if args.date_from: |
|
date_to = args.date_to or args.date_from |
|
dates = [d for d in sorted(records_by_date) if args.date_from <= d <= date_to] |
|
else: |
|
dates = sorted(records_by_date) |
|
|
|
all_cycle_durations = [] |
|
for date in dates: |
|
records = records_by_date.get(date) |
|
if not records: |
|
if args.date_from: |
|
print(f"No sleep data for {date}") |
|
continue |
|
cycles = print_cycles(date, records, metrics) |
|
all_cycle_durations.extend(dur for _, _, dur, _ in cycles) |
|
|
|
if all_cycle_durations: |
|
all_cycle_durations.sort() |
|
n = len(all_cycle_durations) |
|
p90 = all_cycle_durations[min(int(n * 0.9), n - 1)] |
|
median = all_cycle_durations[n // 2] |
|
def fmt(m): h, m = divmod(m, 60); return f"{h}h {m:02d}m" |
|
print(f"Cycle duration ({n} cycles): median {fmt(median)}, P90 {fmt(p90)}") |
|
|
|
|
|
def db_path(xml_path): |
|
return os.path.splitext(xml_path)[0] + ".db" |
|
|
|
|
|
def build_db(xml_path, path): |
|
print(f"Building cache {path} ...", file=sys.stderr) |
|
root = ET.parse(xml_path).getroot() |
|
con = sqlite3.connect(path) |
|
con.execute(""" |
|
CREATE TABLE sleep ( |
|
startDate TEXT, |
|
endDate TEXT, |
|
stage TEXT, |
|
source TEXT |
|
) |
|
""") |
|
for tname in QUANTITY_TABLES.values(): |
|
con.execute(f"CREATE TABLE {tname} (startDate TEXT, value REAL)") |
|
con.execute("CREATE TABLE stand (startDate TEXT, value TEXT)") |
|
|
|
sleep_rows = [] |
|
quantity_rows = {t: [] for t in QUANTITY_TABLES.values()} |
|
stand_rows = [] |
|
for r in root.findall("Record"): |
|
rtype = r.get("type") |
|
if rtype == SLEEP_TYPE: |
|
if "InBed" in r.get("value", ""): |
|
continue |
|
stage = ( |
|
r.get("value", "") |
|
.replace("HKCategoryValueSleepAnalysisAsleep", "") |
|
.replace("HKCategoryValueSleepAnalysis", "") |
|
) |
|
sleep_rows.append((r.get("startDate"), r.get("endDate"), stage, r.get("sourceName", ""))) |
|
elif rtype in QUANTITY_TABLES: |
|
tname = QUANTITY_TABLES[rtype] |
|
src = r.get("sourceName", "") |
|
if tname in ("steps", "energy", "exercise") and "Watch" not in src: |
|
continue |
|
quantity_rows[tname].append( |
|
(r.get("startDate"), float(r.get("value", 0))) |
|
) |
|
elif rtype == STAND_TYPE: |
|
stand_rows.append((r.get("startDate"), r.get("value", ""))) |
|
|
|
con.executemany("INSERT INTO sleep VALUES (?,?,?,?)", sleep_rows) |
|
con.execute("CREATE INDEX idx_end ON sleep (endDate)") |
|
for tname, rows in quantity_rows.items(): |
|
con.executemany(f"INSERT INTO {tname} VALUES (?,?)", rows) |
|
con.execute(f"CREATE INDEX idx_{tname}_start ON {tname} (startDate)") |
|
con.executemany("INSERT INTO stand VALUES (?,?)", stand_rows) |
|
con.execute("CREATE INDEX idx_stand_start ON stand (startDate)") |
|
con.commit() |
|
con.close() |
|
|
|
|
|
def load_all_sleep_records(xml_path, source=None): |
|
path = db_path(xml_path) |
|
|
|
needs_rebuild = ( |
|
not os.path.exists(path) |
|
or os.path.getmtime(path) < os.path.getmtime(xml_path) |
|
) |
|
if not needs_rebuild: |
|
con = sqlite3.connect(path) |
|
tables = {r[0] for r in con.execute( |
|
"SELECT name FROM sqlite_master WHERE type='table'" |
|
)} |
|
con.close() |
|
if not (set(QUANTITY_TABLES.values()) | {"stand"}).issubset(tables): |
|
needs_rebuild = True |
|
if needs_rebuild: |
|
if os.path.exists(path): |
|
os.remove(path) |
|
build_db(xml_path, path) |
|
|
|
con = sqlite3.connect(path) |
|
query = "SELECT startDate, endDate, stage FROM sleep" |
|
params = [] |
|
if source: |
|
query += " WHERE source = ?" |
|
params.append(source) |
|
rows = con.execute(query, params).fetchall() |
|
hrv_rows = con.execute("SELECT startDate, value FROM hrv").fetchall() |
|
rhr_rows = con.execute("SELECT startDate, value FROM rhr").fetchall() |
|
resp_rows = con.execute("SELECT startDate, value FROM resp").fetchall() |
|
vo2_rows = con.execute("SELECT startDate, value FROM vo2").fetchall() |
|
steps_rows = con.execute("SELECT startDate, value FROM steps").fetchall() |
|
energy_rows = con.execute("SELECT startDate, value FROM energy").fetchall() |
|
exercise_rows = con.execute("SELECT startDate, value FROM exercise").fetchall() |
|
stand_rows = con.execute("SELECT startDate, value FROM stand").fetchall() |
|
con.close() |
|
|
|
by_date = defaultdict(list) |
|
for start_str, end_str, stage in rows: |
|
s = parse_dt(start_str) |
|
e = parse_dt(end_str) |
|
# Records ending before noon belong to that morning's night; |
|
# records ending at noon or later are evening/onset records for the next night. |
|
if e.hour >= 12: |
|
night = (e + timedelta(days=1)).strftime("%Y-%m-%d") |
|
else: |
|
night = e.strftime("%Y-%m-%d") |
|
by_date[night].append((s, e, stage)) |
|
for records in by_date.values(): |
|
records.sort() |
|
|
|
def overnight_key(dt): |
|
if dt.hour >= 12: |
|
return (dt + timedelta(days=1)).strftime("%Y-%m-%d") |
|
return dt.strftime("%Y-%m-%d") |
|
|
|
hrv_by_date = defaultdict(list) |
|
for start_str, value in hrv_rows: |
|
hrv_by_date[overnight_key(parse_dt(start_str))].append(value) |
|
|
|
resp_by_date = defaultdict(list) |
|
for start_str, value in resp_rows: |
|
resp_by_date[overnight_key(parse_dt(start_str))].append(value) |
|
|
|
rhr_by_date = defaultdict(list) |
|
for start_str, value in rhr_rows: |
|
rhr_by_date[parse_dt(start_str).strftime("%Y-%m-%d")].append(value) |
|
|
|
vo2_series = sorted( |
|
(parse_dt(s).strftime("%Y-%m-%d"), v) for s, v in vo2_rows |
|
) |
|
|
|
steps_by_date = defaultdict(float) |
|
for start_str, value in steps_rows: |
|
steps_by_date[parse_dt(start_str).strftime("%Y-%m-%d")] += value |
|
|
|
energy_by_date = defaultdict(float) |
|
for start_str, value in energy_rows: |
|
energy_by_date[parse_dt(start_str).strftime("%Y-%m-%d")] += value |
|
|
|
exercise_by_date = defaultdict(float) |
|
for start_str, value in exercise_rows: |
|
exercise_by_date[parse_dt(start_str).strftime("%Y-%m-%d")] += value |
|
|
|
stand_by_date = defaultdict(int) |
|
for start_str, value in stand_rows: |
|
if value == "HKCategoryValueAppleStandHourStood": |
|
stand_by_date[parse_dt(start_str).strftime("%Y-%m-%d")] += 1 |
|
|
|
metrics = { |
|
"hrv": hrv_by_date, |
|
"rhr": rhr_by_date, |
|
"resp": resp_by_date, |
|
"vo2": vo2_series, |
|
"steps": steps_by_date, |
|
"energy": energy_by_date, |
|
"exercise": exercise_by_date, |
|
"stand": stand_by_date, |
|
} |
|
return by_date, metrics |
|
|
|
|
|
def print_cycles(date, records, metrics): |
|
cycles = find_cycles(records) |
|
labels = [f"Cycle {i+1}" for i in range(len(cycles))] |
|
|
|
print(f"Sleep cycles for {date}") |
|
print() |
|
print(f"{'':8} {'Start':>5} {'End':>5} {'Duration':>10} Composition") |
|
print("-" * 70) |
|
for label, (start, end, dur, recs) in zip(labels, cycles): |
|
composition = summarize_stages(recs) |
|
h, m = divmod(dur, 60) |
|
dur_str = f"{h}h {m:02d}m" if h else f"{m}m" |
|
print(f"{label:8} {start.strftime('%H:%M'):>5} {end.strftime('%H:%M'):>5} {dur_str:>10} {composition}") |
|
|
|
total = sum(dur for _, _, dur, _ in cycles) |
|
h, m = divmod(total, 60) |
|
print(f"\n{'Total sleep':>32} {h}h {m:02d}m") |
|
print_trend_metric("HRV", date, metrics["hrv"], "ms", decimals=0) |
|
print_trend_metric("RHR", date, metrics["rhr"], "bpm", decimals=0) |
|
print_trend_metric("Resp rate", date, metrics["resp"], "br/min", decimals=1) |
|
print_vo2(date, metrics["vo2"]) |
|
print_activity(date, metrics) |
|
print() |
|
return cycles |
|
|
|
|
|
def print_trend_metric(label, date, by_date, unit, decimals): |
|
values = by_date.get(date) |
|
if not values: |
|
print(f"{label:>32} (no data)") |
|
return |
|
values = sorted(values) |
|
current = values[len(values) // 2] |
|
prior_dates = sorted(d for d in by_date if d < date)[-5:] |
|
prior_vals = sorted(v for d in prior_dates for v in by_date[d]) |
|
trend_str = "" |
|
if prior_vals: |
|
prior_median = prior_vals[len(prior_vals) // 2] |
|
if decimals == 0: |
|
delta = int(current) - int(prior_median) |
|
else: |
|
delta = round(current - prior_median, decimals) |
|
arrow = "↑" if delta > 0 else "↓" if delta < 0 else "→" |
|
trend_str = f" {arrow}{abs(delta):.{decimals}f} vs 5-night median ({prior_median:.{decimals}f})" |
|
print(f"{label:>32} {current:.{decimals}f} {unit}{trend_str}") |
|
|
|
|
|
def print_vo2(date, vo2_series): |
|
latest = None |
|
for d, v in vo2_series: |
|
if d <= date: |
|
latest = (d, v) |
|
else: |
|
break |
|
if not latest: |
|
return |
|
d, v = latest |
|
print(f"{'VO2 max':>32} {v:.1f} mL/(kg·min) as of {d}") |
|
|
|
|
|
def print_activity(date, metrics): |
|
prev_day = (datetime.strptime(date, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") |
|
energy = metrics["energy"].get(prev_day) |
|
exercise = metrics["exercise"].get(prev_day) |
|
stand = metrics["stand"].get(prev_day) |
|
steps = metrics["steps"].get(prev_day) |
|
if energy is None and exercise is None and stand is None and steps is None: |
|
print(f"{'Activity':>32} (no data)") |
|
return |
|
parts = [] |
|
if energy is not None: parts.append(f"Move {int(energy)} kcal") |
|
if exercise is not None: parts.append(f"Exercise {int(exercise)} min") |
|
if stand is not None: parts.append(f"Stand {stand} h") |
|
if steps is not None: parts.append(f"Steps {int(steps):,}") |
|
print(f"{'Activity':>32} {' · '.join(parts)}") |
|
|
|
|
|
def find_cycles(records): |
|
"""Split records into cycles using REM blocks as cycle boundaries.""" |
|
cycles = [] |
|
current_start = records[0][0] if records else None |
|
|
|
i = 0 |
|
while i < len(records): |
|
s, e, stage = records[i] |
|
if stage == "REM": |
|
# Merge consecutive REM blocks (possibly separated by tiny Core/Awake) |
|
rem_end = e |
|
rem_total = (e - s).total_seconds() / 60 |
|
j = i + 1 |
|
while j < len(records): |
|
ns, ne, nstage = records[j] |
|
gap = (ns - rem_end).total_seconds() / 60 |
|
if gap <= 5 and nstage in ("REM", "Core", "Awake"): |
|
if nstage == "REM": |
|
rem_end = ne |
|
rem_total += (ne - ns).total_seconds() / 60 |
|
j += 1 |
|
else: |
|
break |
|
|
|
if rem_total < MIN_REM_CYCLE_MIN: |
|
i = j |
|
continue |
|
|
|
# Collect all records up to and including this REM block |
|
cycle_records = [r for r in records if current_start <= r[0] < rem_end] |
|
dur = int((rem_end - current_start).total_seconds() / 60) |
|
cycles.append((current_start, rem_end, dur, cycle_records)) |
|
# Find the next record starting at or after rem_end (don't skip |
|
# records that the merge loop may have advanced j past). |
|
j = next((k for k in range(i + 1, len(records)) if records[k][0] >= rem_end), len(records)) |
|
current_start = records[j][0] if j < len(records) else None |
|
i = j |
|
else: |
|
i += 1 |
|
|
|
# Tail: remaining records after last REM |
|
if current_start is not None: |
|
tail = [r for r in records if r[0] >= current_start] |
|
if tail: |
|
tail_end = tail[-1][1] |
|
dur = int((tail_end - current_start).total_seconds() / 60) |
|
cycles.append((current_start, tail_end, dur, tail)) |
|
|
|
return cycles |
|
|
|
|
|
def summarize_stages(cycle_records): |
|
# Resolve overlaps: when records overlap, the later-starting one takes precedence. |
|
# Build a list of non-overlapping segments by clipping earlier records. |
|
segments = [] |
|
for s, e, stage in sorted(cycle_records): |
|
if segments and s < segments[-1][1]: |
|
ps, _, pstage = segments[-1] |
|
segments[-1] = (ps, s, pstage) |
|
segments.append((s, e, stage)) |
|
|
|
totals = {} |
|
for s, e, stage in segments: |
|
dur = (e - s).total_seconds() / 60 |
|
if dur >= 1: |
|
totals[stage] = totals.get(stage, 0) + dur |
|
|
|
parts = [] |
|
for stage in ("Deep", "Core", "REM", "Awake"): |
|
if stage in totals: |
|
parts.append(f"{stage} {int(totals[stage])}m") |
|
return " → ".join(parts) |
|
|
|
|
|
def parse_dt(s): |
|
return datetime.strptime(s, "%Y-%m-%d %H:%M:%S %z") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |