Skip to content

Instantly share code, notes, and snippets.

@NelsonMinar
Created January 20, 2022 18:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NelsonMinar/37208cc6e33e76a6ee63cf0a9d0b4b71 to your computer and use it in GitHub Desktop.
Save NelsonMinar/37208cc6e33e76a6ee63cf0a9d0b4b71 to your computer and use it in GitHub Desktop.
Solar monitoring setup: PVS6, PGE, forecast.solar to Influx
Data imports for a custom solar monitoring system I build using Sunpower's PVS6, Telegraf, Influx, and Grafana.
See http://nelsonslog.wordpress.com/ for details
This code is provided without license or support. If you do something interested and related let me know!
--nelson@monkey.org
#!/usr/bin/env python3
"""Get a solar forecast from forecast.solar and emit it in Influx format
Nelson Minar <nelson@monkey.org>
"""
import json, sys, urllib.request
import pendulum
forecast_url = 'https://api.forecast.solar/estimate/watts/39.2/-121.1/35/70/11.5'
def parse(fp):
data = json.load(fp)
tz_name = data['message']['info']['timezone']
for date, watts in data.get('result', {}).items():
dt = pendulum.parse(date, tz=tz_name)
ts = dt.int_timestamp * 1_000_000_000
kw = watts / 1000.0
print(f'forecast_dot_solar kw={kw:.4} {ts}')
if __name__ == '__main__':
if len(sys.argv) == 1:
parse(urllib.request.urlopen(forecast_url, timeout=120))
for fn in sys.argv[1:]:
parse(open(fn))
#!/usr/bin/env python3
"""Import CSV Green Button usage files from PG&E and emit them in Influx format
Nelson Minar <nelson@monkey.org>
"""
import csv, sys, gzip
import pendulum
def parse(fn: str):
fp = gzip.open(fn, mode='rt', encoding='utf8') if fn.endswith('.gz') else open(fn)
for _ in range(5):
fp.readline()
rows = csv.DictReader(fp)
for row in rows:
assert row['TYPE'] == 'Electric usage'
assert row['UNITS'] == 'kWh'
dt = pendulum.parse(f'{row["DATE"]} {row["END TIME"]}', tz='America/Los_Angeles')
ts = dt.int_timestamp * 1_000_000_000
kwh = row["USAGE"]
cost = float(row["COST"].replace('$', ''))
print(f'pge kwh={kwh},cost={cost:.4} {ts}')
if __name__ == '__main__':
for fn in sys.argv[1:]:
parse(fn)
#!/usr/bin/python3
"""Parse status data from the Sunpower PVS6 and turn it into Influx format, for use with Telegraf.
PVS6 data comes from its endpoint /cgi-bin/dl_cgi?Command=DeviceList'
The PVS6 itself can be the server for this, or a Raspberry Pi or something running as a proxy.
Output contains lots of data from the Sunpower mointors.
Computer status, production and consumption wattages etc (as measured by CT sensors),
also power data from each individual inverter.
Also produces some summary statistics rolling up info about all the inverters.
By Nelson Minar <nelson@monkey.org>.
I have no intent to support this or turn it into a resuable product.
"""
import json, sys, os.path, re, statistics, urllib.request, time
_PVS6_URL = 'http://192.168.3.140/cgi-bin/dl_cgi?Command=DeviceList'
output = []
def extract_fields_tags(record, field_keys, tag_keys, ignore_keys):
"""Given a JSON blob, return three dicts.
One containing all keys in field_keys, one for tag_keys, and one for things not in fields, tags, or ignore."""
field_keys = field_keys.split(',')
tag_keys = tag_keys.split(',')
ignore_keys = ignore_keys.split(',')
fields = {k: v for k, v in record.items() if k in field_keys}
tags = {k: v for k, v in record.items() if k in tag_keys}
others = {k: v for k, v in record.items() if not (k in (field_keys + tag_keys + ignore_keys))}
return fields, tags, others
_strip_whitespace = re.compile(r'\s+')
def _s(s):
"Remove all whitespace from string"
return _strip_whitespace.sub("_", s)
def _v(v):
"Turn the input into a string for influx format. Quotes strings, converts numbers"
try:
return float(v)
except ValueError:
pass
return f'"{v}"'
# Some rollup data populated by the parsing system
summary = {}
inverter_kws = []
inverter_temps = []
def init_summary():
global inverter_kws, inverter_temps, summary
inverter_kws = []
inverter_temps = []
summary = {
'count_not_working': 0,
'count_not_working_inverters': 0,
'avg_inverter_kw': 0,
'sd_inverter_kw': 0,
'count_inverter_low_kw': 0,
'avg_inverter_temp': 0,
'sd_inverter_temp': 0,
}
def create_summary():
summary['avg_inverter_kw'] = statistics.fmean(inverter_kws)
summary['sd_inverter_kw'] = statistics.pstdev(inverter_kws)
summary['avg_inverter_temp'] = statistics.fmean(inverter_temps)
summary['sd_inverter_temp'] = statistics.pstdev(inverter_temps)
# Count "low" inverters
mean_kw = summary['avg_inverter_kw']
if mean_kw < 0.02: # if no power is being generated then skip this check
low = 0
else:
# low means less than 80% of average
low_factor = 0.8
low = sum([1 for x in inverter_kws if x < low_factor * summary['avg_inverter_kw']])
summary['count_inverter_low_kw'] = low
return "pvs6_summary", summary, {}, {}
def parse(data, timestamp):
output = []
skipped = 0
init_summary()
# Parse every record in the JSON input
for record in data['devices']:
device_type = record.get("DEVICE_TYPE")
if device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-P':
output.append(parse_production(record))
elif device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-C':
output.append(parse_consumption(record))
elif device_type == 'Inverter':
output.append(parse_inverter(record))
elif device_type == 'PVS':
output.append(parse_pvs(record))
else:
skipped += 1
if skipped > 0:
sys.stderr.write(f'Skipped {skipped} records of unknown type.\n')
# Make a rollup from the collected data
output.append(create_summary())
# Emit the records in Influx format
for measurement, fields, tags, others in output:
if len(others) != 0:
sys.stderr.write(f'Ignoring {len(others)} unknown data items in {measurement}: {others}\n')
# Strip whitespace from tag names and values and format as k=v,k=v lists for Influx
tag_set = ','.join(f"{_s(k)}={_s(v)}" for k, v in tags.items())
field_set = ','.join(f"{_s(k)}={_v(v)}" for k, v in fields.items())
# Format timestamp into nanoseconds
ts = int(1_000_000_000 * timestamp)
# Only print this delimeter if there are tags
d = ',' if len(tag_set) > 0 else ''
# Emit the Influx record
print(f'{measurement}{d}{tag_set} {field_set} {ts}')
def parse_pvs(record):
fields, tags, others = extract_fields_tags(
record,
"STATE,SWVER,dl_comm_err,dl_cpu_load,dl_err_count,dl_flash_avail,dl_mem_used,dl_scan_time,dl_skipped_scans,dl_untransmitted,dl_uptime",
"SERIAL", "CURTIME,DATATIME,DETAIL,DEVICE_TYPE,HWVER,MODEL,STATEDESCR,panid")
if fields['STATE'] != 'working':
summary['count_not_working'] += 1
return "pvs6_computer", fields, tags, others
def parse_production(record):
fields, tags, others = extract_fields_tags(
record, "STATE,freq_hz,net_ltea_3phsum_kwh,p_3phsum_kw,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto", "SERIAL",
"CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype"
)
if fields['STATE'] != 'working':
summary['count_not_working'] += 1
return "pvs6_production", fields, tags, others
def parse_consumption(record):
fields, tags, others = extract_fields_tags(
record,
"STATE,freq_hz,i1_a,i2_a,neg_ltea_3phsum_kwh,net_ltea_3phsum_kwh,p1_kw,p2_kw,p_3phsum_kw,pos_ltea_3phsum_kwh,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto,v12_v,v1n_v,v2n_v",
"SERIAL",
"CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype"
)
if fields['STATE'] != 'working':
summary['count_not_working'] += 1
return "pvs6_consumption", fields, tags, others
def parse_inverter(record):
fields, tags, others = extract_fields_tags(
record,
"STATE,freq_hz,i_3phsum_a,i_mppt1_a,ltea_3phsum_kwh,p_3phsum_kw,p_mppt1_kw,stat_ind,t_htsnk_degc,v_mppt1_v,vln_3phavg_v",
"SERIAL",
"CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,MOD_SN,NMPLT_SKU,OPERATION,PANEL,PORT,STATEDESCR,SWVER,TYPE,hw_version,interface,origin"
)
if fields['STATE'] != 'working':
summary['count_not_working'] += 1
summary['count_not_working_inverters'] += 1
inverter_kws.append(float(fields['p_3phsum_kw']))
inverter_temps.append(float(fields['t_htsnk_degc']))
return "pvs6_inverter", fields, tags, others
def load_from_file(fn):
ts = os.path.getmtime(fn)
try:
data = json.load(open(fn))
except:
sys.stderr.write(f'Error parsing JSON file {fn}, skipping.\n')
return
parse(data, ts)
def load_from_url(url):
# PVS6 often takes 10-15 seconds to respond
fp = urllib.request.urlopen(url, timeout=50)
data = json.load(fp)
parse(data, time.time())
if __name__ == '__main__':
if len(sys.argv) > 1:
for fn in sys.argv[1:]:
load_from_file(fn)
else:
load_from_url(_PVS6_URL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment