Skip to content

Instantly share code, notes, and snippets.

@BuongiornoTexas
Created November 12, 2022 06:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BuongiornoTexas/3ea2d1df1a569e1a0a4bc79878a8a753 to your computer and use it in GitHub Desktop.
Save BuongiornoTexas/3ea2d1df1a569e1a0a4bc79878a8a753 to your computer and use it in GitHub Desktop.
Rough python script for extracting arbitrary ranges of powerwall history data from the Tesla Cloud
#!/usr/bin/env python
# TO DO:
# - Manage multiple sites in TCInfo (low priority)
# cspell: ignore CACHEFILE NODEMAP teslapy powerwall WHTOKWH QUARTERHOUR mcbirse
import json
import sys
from enum import Enum
from calendar import monthrange, weekday, SUNDAY
from collections import OrderedDict
from datetime import date, datetime, time, timedelta
from tracemalloc import start
from zoneinfo import ZoneInfo
from time import sleep
from typing import Any, OrderedDict, Tuple, Union
from teslapy import Tesla, Battery
CACHEFILE = "cache.json"
PERIOD_KEY = "period"
INTERVAL_KEY = "interval"
KIND_ENERGY = "energy"
END_DATE_KEY = "end_date"
TIMESTAMP = "timestamp"
DATETIME = "datetime"
WHTOKWH = 0.001
def dst_safe_timedelta(date: datetime, dt: timedelta) -> datetime:
# time zone safe dt on date
tz = date.tzinfo
new_date = date.astimezone(ZoneInfo("UTC")) + dt
new_date = new_date.astimezone(tz)
return new_date
class EnergyNode(Enum):
# These are in the order provided in the tesla download CSV files.
HOME = "home load"
SOLAR = "solar gen"
FROM_PW = "from powerwall"
FROM_GRID = "from grid"
TO_GRID = "to grid"
# These are not included in the Tesla CSV files?
TO_PW = "to powerwall"
FROM_GEN = "from generator"
# Right now, this is the grid services. No idea what these mean.
# It's a problem if they are non zero!
OTHER = "other loads & supplies"
class Period(Enum):
DAY = "day"
WEEK = "week"
MONTH = "month"
YEAR = "year"
LIFETIME = "lifetime"
QUARTERHOUR = "15m"
# mapping tesla cloud data to app download columns
NODEMAP = OrderedDict(
{
EnergyNode.HOME: [
"consumer_energy_imported_from_grid",
"consumer_energy_imported_from_solar",
"consumer_energy_imported_from_battery",
"consumer_energy_imported_from_generator",
],
EnergyNode.SOLAR: ["solar_energy_exported"],
EnergyNode.FROM_PW: ["battery_energy_exported"],
EnergyNode.TO_PW: [
"battery_energy_imported_from_grid",
"battery_energy_imported_from_solar",
"battery_energy_imported_from_generator",
],
EnergyNode.FROM_GRID: ["grid_energy_imported"],
EnergyNode.TO_GRID: [
"grid_energy_exported_from_solar",
"grid_energy_exported_from_generator",
"grid_energy_exported_from_battery",
],
EnergyNode.FROM_GEN: ["generator_energy_exported"],
EnergyNode.OTHER: [
"grid_services_energy_imported",
"grid_services_energy_exported",
],
}
)
class TCInfo:
# tesla py instance
tesla: Tesla
# cache values
id: str
date_now: datetime
inst_date: datetime
timezone: ZoneInfo
powerwall: Battery
def __init__(self) -> None:
try:
with open(CACHEFILE) as fp:
# we're going to work with the first email address only
address = list(json.load(fp).keys())[0]
except FileNotFoundError:
address = input(
"Enter Tesla Cloud account email address and follow prompts> "
)
self.tesla = Tesla(email=address, cache_file=CACHEFILE)
# for now, just work on site for first powerwall on list
# Can adopt @mcbirse approach if this ever becomes more than a scratch script.
# cache various bits of data.
self.powerwall = self.tesla.battery_list()[0]
data = self.powerwall.api("SITE_CONFIG")["response"]
# grab various data we will use multiple times.
self.timezone = ZoneInfo(data["installation_time_zone"])
self.date_now = datetime.fromisoformat(
self.powerwall.api("SITE_DATA")["response"][TIMESTAMP]
).astimezone(self.timezone)
self.inst_date = datetime.fromisoformat(data["installation_date"]).astimezone(
self.timezone
)
class EnergyDataSet:
"""Tesla period energy data set. Data read by calling get_data on date range, and
# output by calling dump results.
Note: get_data can be called once only, as the routine is not safe for over-lapping
time ranges."""
tesla: TCInfo
period: Period
values: OrderedDict[
datetime, OrderedDict[Union[str, EnergyNode], Union[str, float]]
]
total: OrderedDict[EnergyNode, float]
write_locked: bool
def __init__(self, tesla: TCInfo, period: Period) -> None:
# super class part of this if I start looking at multiple data sources?
self.tesla = tesla
self.period = period
self.values = OrderedDict()
self.total = OrderedDict()
for node in EnergyNode:
self.total[node] = 0.0
self.write_locked = False
def end_of_previous_day(self, date: datetime) -> datetime:
yesterday = date.date() - timedelta(days=1)
end_of_day = datetime.combine(
yesterday, time(23, 59, 59), tzinfo=self.tesla.timezone
)
return end_of_day
def end_of_period(self, dt_value: datetime) -> datetime:
# Return the end of period datetime for the period containing dt_value
# Get the end of day for dt_value
end_period = dt_value.replace(hour=23, minute=59, second=59)
# Ugly multipart check
match self.period:
case Period.DAY | Period.QUARTERHOUR:
# end of the day for dt_value, already done the hard work
pass
case Period.WEEK:
day_delta = SUNDAY - weekday(
end_period.year,
end_period.month,
end_period.day,
)
if day_delta > 0:
# Do timezone safe just to be sure.
end_period = dst_safe_timedelta(
end_period, timedelta(days=day_delta)
)
case Period.MONTH:
_, last_day = monthrange(end_period.year, end_period.month)
end_period = end_period.replace(day=last_day)
case Period.YEAR:
end_period = end_period.replace(month=12, day=31)
case Period.LIFETIME:
# Date_now by definition
end_period = self.tesla.date_now
# don't ask for time later than now
if end_period > self.tesla.date_now:
end_period = self.tesla.date_now
return end_period
def _working_dates(
self, start_dt: Union[datetime, None], end_dt: Union[datetime, None]
) -> Tuple[datetime, datetime]:
# convert start-end call values to working range for get_data
if (start_dt is None) and (end_dt is None):
raise ValueError(
"One or both of 'start_dt' and 'end_dt' must be set (!= None)."
)
# Can't start earlier than installation date (may start later)
working_start_dt = self.tesla.inst_date
if (start_dt is not None) and (start_dt > working_start_dt):
working_start_dt = start_dt
if end_dt is not None:
working_end_dt = end_dt
else:
# end_dt not defined, so we want the end of period datetime for the
# period containing working_start_dt
working_end_dt = self.end_of_period(working_start_dt)
if working_start_dt > working_end_dt:
raise ValueError(
"end_dt is either earlier than the installation date or "
"earlier than start_dt."
)
return working_start_dt, working_end_dt
def _build_args(self) -> dict[str, Any]:
# build args dict
args_dict: dict[str, Any] = dict(
kind=KIND_ENERGY,
timezone=self.tesla.timezone.key,
installation_timezone=self.tesla.timezone.key,
)
if self.period == Period.QUARTERHOUR:
# 15 minute version requires special handling
args_dict[PERIOD_KEY] = Period.DAY.value
args_dict[INTERVAL_KEY] = self.period.value
else:
args_dict[PERIOD_KEY] = self.period.value
return args_dict
def get_data(
self,
start_dt: Union[datetime, None] = None,
end_dt: Union[datetime, None] = None,
) -> None:
# gather historical data
# identify any missing data values
if self.write_locked:
raise RuntimeError(
"EnergyDataSet.get_data can only be called once per"
" instance, as it cannot manage overlapping time ranges."
" If you want to work with multiple ranges, create multiple instances.",
)
self.write_locked = True
working_start_dt, working_end_dt = self._working_dates(
start_dt=start_dt, end_dt=end_dt
)
# set up the core args for api call
args_dict = self._build_args()
# reset earliest record date for this call
earliest_record_dt = dst_safe_timedelta(self.tesla.date_now, timedelta(days=1))
# Collect final period set up to end_dt
# I believe this is more or less the Tesla call at this point
args_dict[END_DATE_KEY] = working_end_dt
earliest_record_dt = self._process_records(
args_dict=args_dict,
earliest_dt=earliest_record_dt,
start_dt=working_start_dt,
subtract_residual=False,
)
if start_dt is None:
# emulating Tesla call up to end_dt, nothing more to do here.
return
while earliest_record_dt > working_start_dt:
# More data needed.
# Update end date for the next data call to the end of the day before the
# earliest current record (ugly as, needs validation).
args_dict[END_DATE_KEY] = self.end_of_previous_day(earliest_record_dt)
earliest_record_dt = self._process_records(
args_dict=args_dict,
earliest_dt=earliest_record_dt,
start_dt=working_start_dt,
subtract_residual=False,
)
if working_start_dt == self.tesla.inst_date:
# nothing more to do
return
# last bit - find and remove from start of earliest record up to start_dt.
# First up, end date of data to remove is working_start_dt
args_dict[END_DATE_KEY] = working_start_dt
self._process_records(
args_dict=args_dict,
earliest_dt=earliest_record_dt,
start_dt=working_start_dt,
subtract_residual=True,
)
def _timestamp_to_datetime(self, json_data: list[dict[str, Any]]) -> None:
# Tesla does an annoying thing where the timestamp is not locked to midnight
# in the installation timezone. Even worse, the "start time" in a given day
# wanders, and the 15 minute intervals start from whatever this time is.
# So this routine adds a more consistent data time to the records in json_data.
if self.period is Period.QUARTERHOUR:
# special case - up to one day of data in 15 minute intervals starting from
# the timestamp in the first record.
ref_timestamp = datetime.fromisoformat(json_data[0][TIMESTAMP]).astimezone(
self.tesla.timezone
)
if ref_timestamp == self.tesla.inst_date:
day_start = ref_timestamp
else:
day_start = datetime.combine(
ref_timestamp.date(), time(0, 0, 0), tzinfo=self.tesla.timezone
)
for record in json_data:
delta: timedelta = (
datetime.fromisoformat(record[TIMESTAMP]).astimezone(
self.tesla.timezone
)
) - ref_timestamp
record[DATETIME] = day_start + delta
else:
first_record = True
# everything else is at day resolution or higher.
for record in json_data:
day_start = datetime.fromisoformat(record[TIMESTAMP]).astimezone(
self.tesla.timezone
)
if first_record and day_start == self.tesla.inst_date:
first_record = False
else:
day_start = datetime.combine(
day_start.date(), time(0, 0, 0), tzinfo=self.tesla.timezone
)
record[DATETIME] = day_start
def _process_records(
self,
args_dict: dict[str, Any],
earliest_dt: datetime,
start_dt: datetime,
subtract_residual: bool = False,
) -> datetime:
# Function will work with end_date as datetime or isoformat.
dt: datetime
record: OrderedDict[Union[str, EnergyNode], Union[str, float]]
# belt and braces
dt = args_dict[END_DATE_KEY]
if type(dt) == datetime:
args_dict[END_DATE_KEY] = dt.isoformat()
# Force sleep to prevent hammering tesla servers
# (Probably not needed?).
sleep(0.5)
json_data: list[dict[str, Any]] = self.tesla.powerwall.api(
"CALENDAR_HISTORY_DATA", **args_dict
)["response"]["time_series"]
# add field for consistent datetime
self._timestamp_to_datetime(json_data=json_data)
if subtract_residual:
multiplier = -1.0
# Last record date *should* match earliest date. If not, we have a problem.
json_record = json_data[-1]
date_val = json_record[DATETIME]
if date_val != earliest_dt:
raise ValueError(
f"Expected residual data for {earliest_dt}.\n"
f"Found data for {date_val} instead!"
)
# have the record we need, can discard the rest
jason_data = [json_record]
else:
multiplier = 1.0
# json_data is in chronological order. But we are normally more interested in
# the most recent entries back to the entry that is just before start_dt. Given
# this, the logic is cleaner when applied to the data in reverse chronological
# order.
# The only exception is for dealing with the residual, which we deal with by
# reducing json_data to the only record we will use, so reversing doesn't
# matter.
for json_record in reversed(json_data):
date_val = json_record[DATETIME]
if subtract_residual:
record = self.values[date_val]
else:
record = OrderedDict()
record[TIMESTAMP] = json_record[TIMESTAMP]
for key_value in EnergyNode:
# not efficient, but much more readable than catching below
record[key_value] = 0.0
self.values[date_val] = record
# Repair chronological order by moving this record to the start of dict
# This is the cost of cleaner record processing logic
self.values.move_to_end(date_val, last=False)
if date_val < earliest_dt:
earliest_dt = date_val
for key_value in EnergyNode:
for subnode in NODEMAP[key_value]:
record[key_value] += json_record[subnode] * WHTOKWH * multiplier
self.total[key_value] += json_record[subnode] * WHTOKWH * multiplier
if date_val <= start_dt:
# This record crosses start date, no more needed
break
# revert datetime type
args_dict[END_DATE_KEY] = dt
return earliest_dt
def dump_results(
self, title: str, write_all: bool = False, file_name: Union[str, None] = None
) -> None:
if file_name is None:
fp = sys.stdout
else:
fp = open(file_name, "a")
fp.write(title + "\nDate,Timestamp,")
fp.write(",".join([node.value for node in EnergyNode]) + "\n")
if write_all:
for date_val, record in self.values.items():
fp.write(date_val.isoformat() + ",")
fp.write(",".join(str(a) for a in record.values()) + "\n")
fp.write("Total,,")
fp.write(",".join(str(a) for a in self.total.values()) + "\n")
fp.write("\n")
if fp != sys.stdout:
fp.close()
def main() -> None:
# rough sample pulling all lifetime data except quarter hour
# pretty well everything needs a tesla cloud instance
tesla = TCInfo()
lifetime = EnergyDataSet(tesla, Period.DAY)
lifetime.get_data(start_dt=tesla.inst_date, end_dt=tesla.date_now)
lifetime.dump_results(
f"Lifetime for {Period.DAY.value}.", write_all=True, file_name="dump.csv"
)
for period in Period:
if period is Period.DAY or period is Period.QUARTERHOUR:
# comparing to day, and not doing this comparison for quarter hour.
continue
test_data = EnergyDataSet(tesla, period)
test_data.get_data(start_dt=tesla.inst_date, end_dt=tesla.date_now)
test_data.dump_results(
f"Part day for {period.value}.", write_all=True, file_name="dump.csv"
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment