Created
November 12, 2022 06:24
-
-
Save BuongiornoTexas/3ea2d1df1a569e1a0a4bc79878a8a753 to your computer and use it in GitHub Desktop.
Rough python script for extracting arbitrary ranges of powerwall history data from the Tesla Cloud
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# TO DO: | |
# - Manage multiple sites in TCInfo (low priority) | |
# cspell: ignore CACHEFILE NODEMAP teslapy powerwall WHTOKWH QUARTERHOUR mcbirse | |
import json | |
import sys | |
from enum import Enum | |
from calendar import monthrange, weekday, SUNDAY | |
from collections import OrderedDict | |
from datetime import date, datetime, time, timedelta | |
from tracemalloc import start | |
from zoneinfo import ZoneInfo | |
from time import sleep | |
from typing import Any, OrderedDict, Tuple, Union | |
from teslapy import Tesla, Battery | |
CACHEFILE = "cache.json" | |
PERIOD_KEY = "period" | |
INTERVAL_KEY = "interval" | |
KIND_ENERGY = "energy" | |
END_DATE_KEY = "end_date" | |
TIMESTAMP = "timestamp" | |
DATETIME = "datetime" | |
WHTOKWH = 0.001 | |
def dst_safe_timedelta(date: datetime, dt: timedelta) -> datetime: | |
# time zone safe dt on date | |
tz = date.tzinfo | |
new_date = date.astimezone(ZoneInfo("UTC")) + dt | |
new_date = new_date.astimezone(tz) | |
return new_date | |
class EnergyNode(Enum): | |
# These are in the order provided in the tesla download CSV files. | |
HOME = "home load" | |
SOLAR = "solar gen" | |
FROM_PW = "from powerwall" | |
FROM_GRID = "from grid" | |
TO_GRID = "to grid" | |
# These are not included in the Tesla CSV files? | |
TO_PW = "to powerwall" | |
FROM_GEN = "from generator" | |
# Right now, this is the grid services. No idea what these mean. | |
# It's a problem if they are non zero! | |
OTHER = "other loads & supplies" | |
class Period(Enum): | |
DAY = "day" | |
WEEK = "week" | |
MONTH = "month" | |
YEAR = "year" | |
LIFETIME = "lifetime" | |
QUARTERHOUR = "15m" | |
# mapping tesla cloud data to app download columns | |
NODEMAP = OrderedDict( | |
{ | |
EnergyNode.HOME: [ | |
"consumer_energy_imported_from_grid", | |
"consumer_energy_imported_from_solar", | |
"consumer_energy_imported_from_battery", | |
"consumer_energy_imported_from_generator", | |
], | |
EnergyNode.SOLAR: ["solar_energy_exported"], | |
EnergyNode.FROM_PW: ["battery_energy_exported"], | |
EnergyNode.TO_PW: [ | |
"battery_energy_imported_from_grid", | |
"battery_energy_imported_from_solar", | |
"battery_energy_imported_from_generator", | |
], | |
EnergyNode.FROM_GRID: ["grid_energy_imported"], | |
EnergyNode.TO_GRID: [ | |
"grid_energy_exported_from_solar", | |
"grid_energy_exported_from_generator", | |
"grid_energy_exported_from_battery", | |
], | |
EnergyNode.FROM_GEN: ["generator_energy_exported"], | |
EnergyNode.OTHER: [ | |
"grid_services_energy_imported", | |
"grid_services_energy_exported", | |
], | |
} | |
) | |
class TCInfo: | |
# tesla py instance | |
tesla: Tesla | |
# cache values | |
id: str | |
date_now: datetime | |
inst_date: datetime | |
timezone: ZoneInfo | |
powerwall: Battery | |
def __init__(self) -> None: | |
try: | |
with open(CACHEFILE) as fp: | |
# we're going to work with the first email address only | |
address = list(json.load(fp).keys())[0] | |
except FileNotFoundError: | |
address = input( | |
"Enter Tesla Cloud account email address and follow prompts> " | |
) | |
self.tesla = Tesla(email=address, cache_file=CACHEFILE) | |
# for now, just work on site for first powerwall on list | |
# Can adopt @mcbirse approach if this ever becomes more than a scratch script. | |
# cache various bits of data. | |
self.powerwall = self.tesla.battery_list()[0] | |
data = self.powerwall.api("SITE_CONFIG")["response"] | |
# grab various data we will use multiple times. | |
self.timezone = ZoneInfo(data["installation_time_zone"]) | |
self.date_now = datetime.fromisoformat( | |
self.powerwall.api("SITE_DATA")["response"][TIMESTAMP] | |
).astimezone(self.timezone) | |
self.inst_date = datetime.fromisoformat(data["installation_date"]).astimezone( | |
self.timezone | |
) | |
class EnergyDataSet: | |
"""Tesla period energy data set. Data read by calling get_data on date range, and | |
# output by calling dump results. | |
Note: get_data can be called once only, as the routine is not safe for over-lapping | |
time ranges.""" | |
tesla: TCInfo | |
period: Period | |
values: OrderedDict[ | |
datetime, OrderedDict[Union[str, EnergyNode], Union[str, float]] | |
] | |
total: OrderedDict[EnergyNode, float] | |
write_locked: bool | |
def __init__(self, tesla: TCInfo, period: Period) -> None: | |
# super class part of this if I start looking at multiple data sources? | |
self.tesla = tesla | |
self.period = period | |
self.values = OrderedDict() | |
self.total = OrderedDict() | |
for node in EnergyNode: | |
self.total[node] = 0.0 | |
self.write_locked = False | |
def end_of_previous_day(self, date: datetime) -> datetime: | |
yesterday = date.date() - timedelta(days=1) | |
end_of_day = datetime.combine( | |
yesterday, time(23, 59, 59), tzinfo=self.tesla.timezone | |
) | |
return end_of_day | |
def end_of_period(self, dt_value: datetime) -> datetime: | |
# Return the end of period datetime for the period containing dt_value | |
# Get the end of day for dt_value | |
end_period = dt_value.replace(hour=23, minute=59, second=59) | |
# Ugly multipart check | |
match self.period: | |
case Period.DAY | Period.QUARTERHOUR: | |
# end of the day for dt_value, already done the hard work | |
pass | |
case Period.WEEK: | |
day_delta = SUNDAY - weekday( | |
end_period.year, | |
end_period.month, | |
end_period.day, | |
) | |
if day_delta > 0: | |
# Do timezone safe just to be sure. | |
end_period = dst_safe_timedelta( | |
end_period, timedelta(days=day_delta) | |
) | |
case Period.MONTH: | |
_, last_day = monthrange(end_period.year, end_period.month) | |
end_period = end_period.replace(day=last_day) | |
case Period.YEAR: | |
end_period = end_period.replace(month=12, day=31) | |
case Period.LIFETIME: | |
# Date_now by definition | |
end_period = self.tesla.date_now | |
# don't ask for time later than now | |
if end_period > self.tesla.date_now: | |
end_period = self.tesla.date_now | |
return end_period | |
def _working_dates( | |
self, start_dt: Union[datetime, None], end_dt: Union[datetime, None] | |
) -> Tuple[datetime, datetime]: | |
# convert start-end call values to working range for get_data | |
if (start_dt is None) and (end_dt is None): | |
raise ValueError( | |
"One or both of 'start_dt' and 'end_dt' must be set (!= None)." | |
) | |
# Can't start earlier than installation date (may start later) | |
working_start_dt = self.tesla.inst_date | |
if (start_dt is not None) and (start_dt > working_start_dt): | |
working_start_dt = start_dt | |
if end_dt is not None: | |
working_end_dt = end_dt | |
else: | |
# end_dt not defined, so we want the end of period datetime for the | |
# period containing working_start_dt | |
working_end_dt = self.end_of_period(working_start_dt) | |
if working_start_dt > working_end_dt: | |
raise ValueError( | |
"end_dt is either earlier than the installation date or " | |
"earlier than start_dt." | |
) | |
return working_start_dt, working_end_dt | |
def _build_args(self) -> dict[str, Any]: | |
# build args dict | |
args_dict: dict[str, Any] = dict( | |
kind=KIND_ENERGY, | |
timezone=self.tesla.timezone.key, | |
installation_timezone=self.tesla.timezone.key, | |
) | |
if self.period == Period.QUARTERHOUR: | |
# 15 minute version requires special handling | |
args_dict[PERIOD_KEY] = Period.DAY.value | |
args_dict[INTERVAL_KEY] = self.period.value | |
else: | |
args_dict[PERIOD_KEY] = self.period.value | |
return args_dict | |
def get_data( | |
self, | |
start_dt: Union[datetime, None] = None, | |
end_dt: Union[datetime, None] = None, | |
) -> None: | |
# gather historical data | |
# identify any missing data values | |
if self.write_locked: | |
raise RuntimeError( | |
"EnergyDataSet.get_data can only be called once per" | |
" instance, as it cannot manage overlapping time ranges." | |
" If you want to work with multiple ranges, create multiple instances.", | |
) | |
self.write_locked = True | |
working_start_dt, working_end_dt = self._working_dates( | |
start_dt=start_dt, end_dt=end_dt | |
) | |
# set up the core args for api call | |
args_dict = self._build_args() | |
# reset earliest record date for this call | |
earliest_record_dt = dst_safe_timedelta(self.tesla.date_now, timedelta(days=1)) | |
# Collect final period set up to end_dt | |
# I believe this is more or less the Tesla call at this point | |
args_dict[END_DATE_KEY] = working_end_dt | |
earliest_record_dt = self._process_records( | |
args_dict=args_dict, | |
earliest_dt=earliest_record_dt, | |
start_dt=working_start_dt, | |
subtract_residual=False, | |
) | |
if start_dt is None: | |
# emulating Tesla call up to end_dt, nothing more to do here. | |
return | |
while earliest_record_dt > working_start_dt: | |
# More data needed. | |
# Update end date for the next data call to the end of the day before the | |
# earliest current record (ugly as, needs validation). | |
args_dict[END_DATE_KEY] = self.end_of_previous_day(earliest_record_dt) | |
earliest_record_dt = self._process_records( | |
args_dict=args_dict, | |
earliest_dt=earliest_record_dt, | |
start_dt=working_start_dt, | |
subtract_residual=False, | |
) | |
if working_start_dt == self.tesla.inst_date: | |
# nothing more to do | |
return | |
# last bit - find and remove from start of earliest record up to start_dt. | |
# First up, end date of data to remove is working_start_dt | |
args_dict[END_DATE_KEY] = working_start_dt | |
self._process_records( | |
args_dict=args_dict, | |
earliest_dt=earliest_record_dt, | |
start_dt=working_start_dt, | |
subtract_residual=True, | |
) | |
def _timestamp_to_datetime(self, json_data: list[dict[str, Any]]) -> None: | |
# Tesla does an annoying thing where the timestamp is not locked to midnight | |
# in the installation timezone. Even worse, the "start time" in a given day | |
# wanders, and the 15 minute intervals start from whatever this time is. | |
# So this routine adds a more consistent data time to the records in json_data. | |
if self.period is Period.QUARTERHOUR: | |
# special case - up to one day of data in 15 minute intervals starting from | |
# the timestamp in the first record. | |
ref_timestamp = datetime.fromisoformat(json_data[0][TIMESTAMP]).astimezone( | |
self.tesla.timezone | |
) | |
if ref_timestamp == self.tesla.inst_date: | |
day_start = ref_timestamp | |
else: | |
day_start = datetime.combine( | |
ref_timestamp.date(), time(0, 0, 0), tzinfo=self.tesla.timezone | |
) | |
for record in json_data: | |
delta: timedelta = ( | |
datetime.fromisoformat(record[TIMESTAMP]).astimezone( | |
self.tesla.timezone | |
) | |
) - ref_timestamp | |
record[DATETIME] = day_start + delta | |
else: | |
first_record = True | |
# everything else is at day resolution or higher. | |
for record in json_data: | |
day_start = datetime.fromisoformat(record[TIMESTAMP]).astimezone( | |
self.tesla.timezone | |
) | |
if first_record and day_start == self.tesla.inst_date: | |
first_record = False | |
else: | |
day_start = datetime.combine( | |
day_start.date(), time(0, 0, 0), tzinfo=self.tesla.timezone | |
) | |
record[DATETIME] = day_start | |
def _process_records( | |
self, | |
args_dict: dict[str, Any], | |
earliest_dt: datetime, | |
start_dt: datetime, | |
subtract_residual: bool = False, | |
) -> datetime: | |
# Function will work with end_date as datetime or isoformat. | |
dt: datetime | |
record: OrderedDict[Union[str, EnergyNode], Union[str, float]] | |
# belt and braces | |
dt = args_dict[END_DATE_KEY] | |
if type(dt) == datetime: | |
args_dict[END_DATE_KEY] = dt.isoformat() | |
# Force sleep to prevent hammering tesla servers | |
# (Probably not needed?). | |
sleep(0.5) | |
json_data: list[dict[str, Any]] = self.tesla.powerwall.api( | |
"CALENDAR_HISTORY_DATA", **args_dict | |
)["response"]["time_series"] | |
# add field for consistent datetime | |
self._timestamp_to_datetime(json_data=json_data) | |
if subtract_residual: | |
multiplier = -1.0 | |
# Last record date *should* match earliest date. If not, we have a problem. | |
json_record = json_data[-1] | |
date_val = json_record[DATETIME] | |
if date_val != earliest_dt: | |
raise ValueError( | |
f"Expected residual data for {earliest_dt}.\n" | |
f"Found data for {date_val} instead!" | |
) | |
# have the record we need, can discard the rest | |
jason_data = [json_record] | |
else: | |
multiplier = 1.0 | |
# json_data is in chronological order. But we are normally more interested in | |
# the most recent entries back to the entry that is just before start_dt. Given | |
# this, the logic is cleaner when applied to the data in reverse chronological | |
# order. | |
# The only exception is for dealing with the residual, which we deal with by | |
# reducing json_data to the only record we will use, so reversing doesn't | |
# matter. | |
for json_record in reversed(json_data): | |
date_val = json_record[DATETIME] | |
if subtract_residual: | |
record = self.values[date_val] | |
else: | |
record = OrderedDict() | |
record[TIMESTAMP] = json_record[TIMESTAMP] | |
for key_value in EnergyNode: | |
# not efficient, but much more readable than catching below | |
record[key_value] = 0.0 | |
self.values[date_val] = record | |
# Repair chronological order by moving this record to the start of dict | |
# This is the cost of cleaner record processing logic | |
self.values.move_to_end(date_val, last=False) | |
if date_val < earliest_dt: | |
earliest_dt = date_val | |
for key_value in EnergyNode: | |
for subnode in NODEMAP[key_value]: | |
record[key_value] += json_record[subnode] * WHTOKWH * multiplier | |
self.total[key_value] += json_record[subnode] * WHTOKWH * multiplier | |
if date_val <= start_dt: | |
# This record crosses start date, no more needed | |
break | |
# revert datetime type | |
args_dict[END_DATE_KEY] = dt | |
return earliest_dt | |
def dump_results( | |
self, title: str, write_all: bool = False, file_name: Union[str, None] = None | |
) -> None: | |
if file_name is None: | |
fp = sys.stdout | |
else: | |
fp = open(file_name, "a") | |
fp.write(title + "\nDate,Timestamp,") | |
fp.write(",".join([node.value for node in EnergyNode]) + "\n") | |
if write_all: | |
for date_val, record in self.values.items(): | |
fp.write(date_val.isoformat() + ",") | |
fp.write(",".join(str(a) for a in record.values()) + "\n") | |
fp.write("Total,,") | |
fp.write(",".join(str(a) for a in self.total.values()) + "\n") | |
fp.write("\n") | |
if fp != sys.stdout: | |
fp.close() | |
def main() -> None: | |
# rough sample pulling all lifetime data except quarter hour | |
# pretty well everything needs a tesla cloud instance | |
tesla = TCInfo() | |
lifetime = EnergyDataSet(tesla, Period.DAY) | |
lifetime.get_data(start_dt=tesla.inst_date, end_dt=tesla.date_now) | |
lifetime.dump_results( | |
f"Lifetime for {Period.DAY.value}.", write_all=True, file_name="dump.csv" | |
) | |
for period in Period: | |
if period is Period.DAY or period is Period.QUARTERHOUR: | |
# comparing to day, and not doing this comparison for quarter hour. | |
continue | |
test_data = EnergyDataSet(tesla, period) | |
test_data.get_data(start_dt=tesla.inst_date, end_dt=tesla.date_now) | |
test_data.dump_results( | |
f"Part day for {period.value}.", write_all=True, file_name="dump.csv" | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment