Skip to content

Instantly share code, notes, and snippets.

@krconv
Created November 24, 2021 04:50
Show Gist options
  • Save krconv/79914fee9dabe0c46c9b380d44716956 to your computer and use it in GitHub Desktop.
Save krconv/79914fee9dabe0c46c9b380d44716956 to your computer and use it in GitHub Desktop.
import subprocess
import ctypes
import binascii
import struct
PROC = subprocess.Popen(
[
"mosquitto_sub",
"-t",
"prod/minions/emporia/ct/v1/A2108A04B440F5206A2668/debug/v2",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def main():
while True:
line = readline()
if not "id: A2108A04B440F5206A2668" in line:
continue
print("=======================")
l1 = readline()
# print(f"0x0000: {int8(l1[0:])} (version)")
# print(f"0x0001: {l1[2:4]} (maybe checksum?)")
# print(f"0x0002: {int8(l1[4:])} (unknown)")
# print(f"0x0003: {uint16(l1[6:])} (counter)")
# print(f"0x0004: {int32(l1[8:])} (I01 - P[V1])")
# print(f"0x0008: {int32(l1[16:])} (I01 - P[V2])")
# print(f"0x000C: {int32(l1[24:])} (I01)")
# print()
l2 = readline()
# print(f"0x0010: {int32(l2[0:])} (I02)")
# print(f"0x0014: {int32(l2[8:])} (I02)")
# print(f"0x0018: {int32(l2[16:])} (I02)")
# print(f"0x001C: {int32(l2[24:])} (I03)")
# print()
l3 = readline()
# print(f"0x0020: {int32(l3[0:])} (I03)")
# print(f"0x0024: {int32(l3[8:])} (I03)")
print(f"0x0028: {int32(l3[16:])} (IO4)")
print(f"0x002C: {int32(l3[24:])} (IO4)")
# print()
l4 = readline()
print(f"0x0030: {int32(l4[0:])} (IO4)")
# print(f"0x0034: {int32(l4[8:])} (I05)")
# print(f"0x0038: {int32(l4[16:])} (I05)")
# print(f"0x003C: {int32(l4[24:])} (I05)")
# print()
l5 = readline()
# print(f"0x0040: {int32(l5[0:])} (I06)")
# print(f"0x0044: {int32(l5[8:])} (I06)")
# print(f"0x0048: {int32(l5[16:])} (I06)")
# print(f"0x004C: {int32(l5[24:])} (I07)")
# print()
l6 = readline()
# print(f"0x0050: {int32(l6[0:])} (I07)")
# print(f"0x0054: {int32(l6[8:])} (I07)")
# print(f"0x0058: {int32(l6[16:])} (I08)")
# print(f"0x005C: {int32(l6[24:])} (I08)")
# print()
l7 = readline()
# print(f"0x0060: {int32(l7[0:])} (I08)")
# print(f"0x0064: {int32(l7[8:])} (I09)")
# print(f"0x0068: {int32(l7[16:])} (I09)")
# print(f"0x006C: {int32(l7[24:])} (I09)")
# print()
l8 = readline()
# print(f"0x0070: {int32(l8[0:])} (I10)")
# print(f"0x0074: {int32(l8[8:])} (I10)")
# print(f"0x0078: {int32(l8[16:])} (I10)")
# print(f"0x007C: {int32(l8[24:])} (I11)")
# print()
l9 = readline()
# print(f"0x0080: {int32(l9[0:])} (I11)")
# print(f"0x0084: {int32(l9[8:])} (I11)")
# print(f"0x0088: {int32(l9[16:])} (I12)")
# print(f"0x008C: {int32(l9[24:])} (I12)")
# print()
l10 = readline()
# print(f"0x0090: {int32(l10[0:])} (I12)")
# print(f"0x0094: {int32(l10[8:])} (I13)")
# print(f"0x0098: {int32(l10[16:])} (I13)")
# print(f"0x009C: {int32(l10[24:])} (I13)")
# print()
l11 = readline()
# print(f"0x00A0: {int32(l11[0:])} (I14)")
# print(f"0x00A4: {int32(l11[8:])} (I14)")
# print(f"0x00A8: {int32(l11[16:])} (I14)")
# print(f"0x00AC: {int32(l11[24:])} (I15)")
# print()
l12 = readline()
# print(f"0x00B0: {int32(l12[0:])} (I15)")
# print(f"0x00B4: {int32(l12[8:])} (I15)")
# print(f"0x00B8: {int32(l12[16:])} (I16)")
# print(f"0x00BC: {int32(l12[24:])} (I16)")
# print()
l13 = readline()
# print(f"0x00C0: {int32(l13[0:])} (I16)")
# print(f"0x00C4: {int32(l13[8:])} (I17)")
# print(f"0x00C8: {int32(l13[16:])} (I17)")
# print(f"0x00CC: {int32(l13[24:])} (I17)")
# print()
l14 = readline()
# print(f"0x00D0: {int32(l14[0:])} (I18)")
# print(f"0x00D4: {int32(l14[8:])} (I18)")
# print(f"0x00D8: {int32(l14[16:])} (I18)")
# print(f"0x00DC: {int32(l14[24:])} (I19)")
# print()
l15 = readline()
# print(f"0x00E0: {int32(l15[0:])} (I19)")
# print(f"0x00E4: {int32(l15[8:])} (I19)")
# print(f"0x00E8: {uint16(l15[16:])} (V1)")
# print(f"0x00EA: {uint16(l15[20:])} (V2)")
# print(f"0x00EC: {uint16(l15[24:])} (V3)")
# print(f"0x00EE: {uint16(l15[28:])} (Hz)")
# print()
l16 = readline()
# print(f"0x00F0: {uint16(l16[0:])} (V2 degrees)")
# print(f"0x00F2: {uint16(l16[4:])} (V3 degrees)")
# print(f"0x00F4: {uint16(l16[8:])} (I01)")
# print(f"0x00F6: {uint16(l16[12:])} (I02)")
# print(f"0x00F8: {uint16(l16[16:])} (I03)")
# print(f"0x00FA: {uint16(l16[20:])} (I04)")
# print(f"0x00FC: {uint16(l16[24:])} (I05)")
# print(f"0x00FE: {uint16(l16[28:])} (I06)")
# print()
l17 = readline()
# print(f"0x0100: {uint16(l17[0:])} (I07)")
# print(f"0x0102: {uint16(l17[4:])} (I08)")
# print(f"0x0104: {uint16(l17[8:])} (I09)")
# print(f"0x0106: {uint16(l17[12:])} (I10)")
# print(f"0x0108: {uint16(l17[16:])} (I11)")
# print(f"0x010A: {uint16(l17[20:])} (I12)")
# print(f"0x010C: {uint16(l17[24:])} (I13)")
# print(f"0x010E: {uint16(l17[28:])} (I14)")
# print()
l18 = readline()
# print(f"0x0110: {uint16(l18[0:])} (I15)")
# print(f"0x0112: {uint16(l18[4:])} (I16)")
# print(f"0x0114: {uint16(l18[8:])} (I17)")
# print(f"0x0116: {uint16(l18[12:])} (I18)")
# print(f"0x0118: {uint16(l18[16:])} (I19)")
# print(f"0x011A: {l18[20:]} (end)")
print()
def readline():
return PROC.stdout.readline().decode("utf-8").strip()
def int8(str):
i = int(str[0:2], 16)
return ctypes.c_int8(i).value
def int16(str):
i = int(str[0:4], 16)
return ctypes.c_int16(i).value
def uint16(str):
i = int(str[0:4], 16)
return ctypes.c_uint16(i).value
def int32(str):
i = parse_hex(str[0:8])
return ctypes.c_int32(i).value
def parse_hex(str):
return struct.unpack("<i", binascii.unhexlify(str))[0]
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment