Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ashleysommer/2e11f232abc5509243ea408d5a33dbc0 to your computer and use it in GitHub Desktop.
Save ashleysommer/2e11f232abc5509243ea408d5a33dbc0 to your computer and use it in GitHub Desktop.
Ginlong wifi payload decode and push
import sys
import struct
import asyncio
import aiohttp
import json
import time
import os
import glob
import bz2
import tarfile
from os import path, mkdir
from asyncio import StreamReader, StreamWriter
from collections import OrderedDict
from datetime import datetime, timedelta
_module = sys.modules[__name__]
_module.CONSTS = CONSTS = dict()
CONSTS['LOGFILE_DIRECTORY'] = LOGFILE_DIRECTORY = "./output"
CONSTS['PVOUTPUT_ENDPOINT'] = PVOUTPUT_ENDPOINT = "https://pvoutput.org/service/r2/addstatus.jsp"
CONSTS['HEADER_SIZE'] = HEADER_SIZE = 12
CONSTS['TAIL_SIZE'] = TAIL_SIZE = 2
CONSTS['SENSOR_VARIABLES'] = SENSOR_VARIABLES = OrderedDict([
("temp", (2, b'>H', 0.1, float)),
("vpv1", (2, b'>H', 0.1, float)),
("vpv2", (2, b'>H', 0.1, float)),
("unknown1", (2, b'>H', 1, int)),
("ipv1", (2, b'>H', 0.1, float)),
("ipv2", (2, b'>H', 0.1, float)),
("unknown2", (2, b'>H', 1, int)),
("iac1", (2, b'>H', 0.1, float)),
("iac2", (2, b'>H', 0.1, float)),
("iac3", (2, b'>H', 0.1, float)),
("vac1", (2, b'>H', 0.1, float)),
("vac2", (2, b'>H', 0.1, float)),
("vac3", (2, b'>H', 0.1, float)),
("fac", (2, b'>H', 0.01, float)),
("pac1", (2, b'>H', 1, int)),
("pac2", (2, b'>H', 1, int)),
("pac3", (2, b'>H', 1, int)),
("unknown3", (2, b'>H', 1, int)),
("unknown4", (2, b'>H', 1, int)),
("e_today", (2, b'>H', 0.01, float)),
("e_total", (4, b'>I', 0.1, int)),
("h_total", (4, b'>I', 1, int)),
("model", (2, b'>H', 1, int))
])
sample = (0x68, 0x7c, 0x51, 0xb0, 0xe2, 0xe2, 0x8a, 0x25, 0xe2, 0xe2, 0x8a, 0x25, 0x81, 0x03, 0x05, 0x30, 0x30, 0x30, 0x41, 0x43, 0x30, 0x30, 0x31, 0x37, 0x36, 0x32, 0x31, 0x36, 0x35, 0x33, 0x20, 0x00, 0xea, 0x0b, 0x01, 0x08, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x09, 0x55, 0x00, 0x00, 0x00, 0x00, 0x13, 0x90, 0x00, 0xbf, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x0b, 0x0e, 0x0c, 0x62, 0x00, 0x00, 0x02, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0xac, 0x04, 0x04, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x27, 0x00, 0x00, 0x00, 0xbf, 0x00, 0x00, 0xbf, 0x00, 0x00, 0x00, 0x00, 0x55, 0x09, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x96, 0x16, 0x68, 0x29, 0x51, 0xb1, 0xe2, 0xe2, 0x8a, 0x25, 0xe2, 0xe2, 0x8a, 0x25, 0x80, 0x01, 0x48, 0x34, 0x2e, 0x30, 0x31, 0x2e, 0x35, 0x31, 0x59, 0x34, 0x2e, 0x30, 0x2e, 0x30, 0x32, 0x57, 0x31, 0x2e, 0x30, 0x2e, 0x35, 0x31, 0x28, 0x47, 0x4c, 0x31, 0x36, 0x2d, 0x30, 0x35, 0x2d, 0x30, 0x36, 0x31, 0x2d, 0x44, 0x29, 0x60, 0x00, 0x98, 0x16)
def first(iterable):
"""Util function"""
assert isinstance(iterable, (bytes, str, set, frozenset, list, tuple))
return next(iter(iterable))
# https://stackoverflow.com/a/1160227
def touch(fname, mode=0o666, **kwargs):
"""ensure the file exists"""
flags = os.O_CREAT | os.O_APPEND
with os.fdopen(os.open(fname, flags=flags, mode=mode)) as f:
os.utime(f.fileno() if os.utime in os.supports_fd else fname, **kwargs)
def zip_log_files(logfile_dir: str, at=7):
csv_glob_str = "/".join([logfile_dir, "*.csv"])
globs = glob.glob(csv_glob_str)
if len(globs) < 1:
print("found no log files. Not zipping.")
return
elif len(globs) < int(at):
print("found less than {} log files. Not zipping.".format(at))
return
globs = sorted(globs) # this will hopefully sort them by filename alphabetically.
if len(globs) == 1:
base_zip_name = path.join(logfile_dir, ".".join([path.basename(globs[0]), "bz2"]))
do_tar = False
else:
top_name = path.basename(globs[0])
bottom_name = path.basename(globs[-1])
base_zip_name = path.join(logfile_dir, ".".join([top_name, "to", bottom_name, "tar.bz2"]))
do_tar = True
zip_name = base_zip_name
append_num = 0
while path.exists(zip_name):
append_num += 1
if append_num == 10:
raise RuntimeError("Something weird happened. Made too many log files for the same period.")
zip_name = ".".join([base_zip_name, str(append_num)])
do_delete = False
if do_tar:
try:
with tarfile.open(zip_name, "w:bz2") as zipfile: # type: tarfile.TarFile
_ = [zipfile.add(g, arcname=path.basename(g), recursive=False) for g in globs]
assert path.isfile(zip_name), "our new tar.bz2 file could not be found!"
do_delete = True
except Exception as e:
print(e)
do_delete = False
else:
try:
with open(globs[0], mode="rb") as single_logfile:
with bz2.open(zip_name, mode="wb") as zipfile: # type: bz2.BZ2File
zipfile.write(single_logfile.read())
assert path.isfile(zip_name), "our new .bz2 file could not be found!"
do_delete = True
except Exception as e:
print(e)
do_delete = False
if do_delete:
try:
for g in globs:
os.remove(g)
except Exception as e:
print(e)
def write_csv_headers(logfile_path):
headers_to_use = ['time']
for k in SENSOR_VARIABLES:
if "unknown" in k:
continue
elif k == "h_total" or k == "model":
continue
elif k == "pac2" or k == "pac3" or k == "vac2" or k == "vac3":
continue
headers_to_use.append(k)
line = ",".join(headers_to_use)
with open(logfile_path, "w") as logfile:
logfile.write(line)
logfile.write("\n")
def write_csv_values(received_datetime, logfile_path, values_dict):
vars_to_use = list()
for k in SENSOR_VARIABLES:
if "unknown" in k:
continue
elif k == "h_total" or k == "model":
continue
elif k == "pac2" or k == "pac3" or k == "vac2" or k == "vac3":
continue
vars_to_use.append(k)
time_str = received_datetime.strftime("%H:%M:%S")
vals = list()
for k in vars_to_use:
try:
val = values_dict[k]
if isinstance(val, float):
val = "{0:.3f}".format(val)
val = val.rstrip('0') # if it ends with a zero, strip it (eg, "10.310" -> "10.31")
val = val.rstrip('.') # if it ends with a decimal, strip it (eg, "10." -> "10")
else:
val = str(val)
except Exception:
val = "NULL"
vals.append(val)
line = ",".join(vals)
with open(logfile_path, "a") as logfile:
logfile.write("{},".format(time_str))
logfile.write(line)
logfile.write("\n")
def get_logfile_path():
expanded_dir = path.expandvars(path.expanduser(LOGFILE_DIRECTORY))
logfile_dir = path.abspath(expanded_dir)
if not path.exists(logfile_dir):
mkdir(logfile_dir)
elif path.isfile(logfile_dir):
raise RuntimeError("Output logfile directory \"{}\" is a file, not a directory!".format(logfile_dir))
datetime_now = datetime.now() # get local time
datetime_string = datetime_now.strftime("%Y%m%d")
file_name = "log_{}.csv".format(datetime_string)
file_path = path.join(logfile_dir, file_name)
if not path.exists(file_path):
zip_log_files(logfile_dir, at=7)
touch(file_path, mode=0o777)
write_csv_headers(file_path)
return file_path
async def push_to_pvoutput(date_time: datetime, energy_gen=None, power_gen=None, energy_consumption=None,
power_consumption=None, temperature=None, voltage=None):
session = push_to_pvoutput.session
if session is None:
headers = {"X-Pvoutput-Apikey": SETTINGS['pvoutput_apikey'],
"X-Pvoutput-SystemId": SETTINGS['pvoutput_system_id']}
session = aiohttp.ClientSession(headers=headers)
push_to_pvoutput.session = session
data = {"d": date_time.strftime("%Y%m%d"), "t": date_time.strftime("%H:%M")}
if energy_gen is None and power_gen is None and energy_consumption is None and power_consumption is None:
raise RuntimeError("Must have at lest one v1-v4 variable to sent to PVOutput")
if energy_gen is not None:
data['v1'] = float(energy_gen)
if power_gen is not None:
data['v2'] = float(power_gen)
if energy_consumption is not None:
data['v3'] = float(energy_consumption)
if power_consumption is not None:
data['v4'] = float(power_consumption)
if temperature is not None:
data['v5'] = float(temperature)
if voltage is not None:
data['v6'] = float(voltage)
async with session.post(url=PVOUTPUT_ENDPOINT, data=data) as result:
status = result.status
body = await result.read()
if 200 <= status < 300:
return True
else:
print(body)
push_to_pvoutput.session = None
def decode_payload(received_datetime, payload_bytes):
offset = 0
payload_datas = list()
while True:
start_char = payload_bytes[offset]
assert start_char == 0x68, "payload chunk must start with 0x68"
len_char = payload_bytes[offset+1]
len_int = int(len_char)
if (len_int + HEADER_SIZE + TAIL_SIZE) > len(payload_bytes):
raise ValueError("Did not receive the whole payload.")
body_start = offset + HEADER_SIZE
header = payload_bytes[offset:body_start]
tail_start = body_start+len_int
tail_end = tail_start+TAIL_SIZE
tail = payload_bytes[tail_start:tail_end]
assert tail[1] == 0x16, "payload chunk should start with 0x16"
j = 0
for i in range(offset+1, tail_start):
j += (payload_bytes[i] & 0xFF)
j &= 0xFF
cs = tail[0]
assert int(cs) == int(j), "payload checksum does not match"
flag1 = first(bytes(header[1:2]))
flag2 = first(bytes(header[2:3]))
flag3 = first(bytes(header[3:4]))
target_buffer = bytes(header[4:8])
client_buffer = bytes(header[8:12])
target_id = first(struct.unpack(b'<I', target_buffer))
client_id = first(struct.unpack(b'<I', client_buffer))
body = bytes(payload_bytes[body_start:body_start+len_int])
command = first(body)
if command == 1:
arg = body[1]
print("found command #{} with arg #{}".format(command, arg))
message = bytes(body[2:tail_start])
print("message: {}".format(message))
if command == 128:
arg = body[1]
print("found command #{} with arg #{}".format(command, arg))
message = bytes(body[2:tail_start])
print("message: {}".format(message))
elif command == 129:
sensor = first(struct.unpack(b'<H', bytes(body[1:3])))
print("found command #{} from sensor type #{}".format(command, sensor))
cursor = 3
sn_buffer = bytes(body[cursor:cursor+16])
sn = sn_buffer.decode("ISO-8859-1")
print("got sensor serial number: {}".format(sn))
cursor += 16
payload_data = dict()
for k,v in SENSOR_VARIABLES.items():
(bytes_size, bytes_format, multiplier, data_type) = v
var_buffer = bytes(body[cursor:cursor+bytes_size])
var_unpack = first(struct.unpack(bytes_format, var_buffer))
if bytes_size == 2 and var_unpack == 65535:
var_unpack = 0
if data_type == float:
if multiplier is not None:
var = float(var_unpack) * multiplier
else:
var = float(var_unpack)
elif data_type == int:
if multiplier is not None:
var = round(float(var_unpack) * multiplier)
else:
var = int(var_unpack)
else:
raise RuntimeError("Don't know what to do with data type: {}".format(data_type))
print("var {}:\t{}".format(k,var))
payload_data[k] = var
cursor += bytes_size
payload_data['pac'] = int(payload_data['pac1'])+int(payload_data['pac2'])+int(payload_data['pac3'])
payload_datas.append(payload_data)
offset = tail_end
if offset >= len(payload_bytes):
break
return payload_datas
async def handle_receive(reader: StreamReader, writer: StreamWriter):
data = await reader.read()
last_received = handle_receive.last_received
last_sent = handle_receive.last_sent
consecutive_zeros = handle_receive.consecutive_zeros
last_energy_gen = handle_receive.last_energy_gen
received_time = time.time()
if last_received is None:
last_received = 0
print("first connection received.")
else:
delta_received = received_time - last_received
print("last received {} seconds ago.".format(delta_received))
if last_energy_gen is None:
last_energy_gen = -1.0
if consecutive_zeros is None:
consecutive_zeros = 0
hex_data = ''.join('{:02x}'.format(x) for x in data)
addr = writer.get_extra_info('peername')
print("Received {} from {}".format(hex_data, addr))
handle_receive.last_received = received_time
received_datetime = datetime.fromtimestamp(received_time)
payload_datas = decode_payload(received_datetime, data)
if len(payload_datas) < 1:
print("Didn't get any payload variables?!")
return
payload_data = first(payload_datas)
energy_day_accum = payload_data['e_today']*1000.0
pac = payload_data['pac']
idle = False
if round(energy_day_accum, 1) == round(last_energy_gen, 1) and round(pac, 1) == 0.0:
consecutive_zeros = consecutive_zeros + 1
if consecutive_zeros >= 2:
idle = True
else:
consecutive_zeros = 0
handle_receive.consecutive_zeros = consecutive_zeros
handle_receive.last_energy_gen = energy_day_accum
if not idle:
logfile_path = get_logfile_path()
write_csv_values(received_datetime, logfile_path, payload_data)
if last_sent is None:
print("sending first packet to pvoutput.")
delta_seconds = SETTINGS['pvoutput_interval']
else:
delta_sent = received_time - last_sent
delta_seconds = delta_sent
if delta_seconds >= SETTINGS['pvoutput_interval']:
result = await push_to_pvoutput(received_datetime, energy_gen=energy_day_accum, power_gen=pac, temperature=payload_data['temp'], voltage=payload_data['vac1'])
if result is True:
handle_receive.last_sent = time.time()
else:
print("skipping send. Sent last one {} seconds ago.".format(delta_seconds))
else:
pass # idle
#print("Send: %r" % message)
#writer.write(data)
#await writer.drain()
#print("Close the client socket")
writer.close()
handle_receive.last_received = None
handle_receive.last_sent = None
handle_receive.last_energy_gen = None
handle_receive.consecutive_zeros = None
def run_server():
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_receive, '0.0.0.0', 8081, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment