/triage_sensor_data.py Secret
Created
November 24, 2021 04:50
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import ctypes | |
import binascii | |
import struct | |
PROC = subprocess.Popen( | |
[ | |
"mosquitto_sub", | |
"-t", | |
"prod/minions/emporia/ct/v1/A2108A04B440F5206A2668/debug/v2", | |
], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
) | |
def main(): | |
while True: | |
line = readline() | |
if not "id: A2108A04B440F5206A2668" in line: | |
continue | |
print("=======================") | |
l1 = readline() | |
# print(f"0x0000: {int8(l1[0:])} (version)") | |
# print(f"0x0001: {l1[2:4]} (maybe checksum?)") | |
# print(f"0x0002: {int8(l1[4:])} (unknown)") | |
# print(f"0x0003: {uint16(l1[6:])} (counter)") | |
# print(f"0x0004: {int32(l1[8:])} (I01 - P[V1])") | |
# print(f"0x0008: {int32(l1[16:])} (I01 - P[V2])") | |
# print(f"0x000C: {int32(l1[24:])} (I01)") | |
# print() | |
l2 = readline() | |
# print(f"0x0010: {int32(l2[0:])} (I02)") | |
# print(f"0x0014: {int32(l2[8:])} (I02)") | |
# print(f"0x0018: {int32(l2[16:])} (I02)") | |
# print(f"0x001C: {int32(l2[24:])} (I03)") | |
# print() | |
l3 = readline() | |
# print(f"0x0020: {int32(l3[0:])} (I03)") | |
# print(f"0x0024: {int32(l3[8:])} (I03)") | |
print(f"0x0028: {int32(l3[16:])} (IO4)") | |
print(f"0x002C: {int32(l3[24:])} (IO4)") | |
# print() | |
l4 = readline() | |
print(f"0x0030: {int32(l4[0:])} (IO4)") | |
# print(f"0x0034: {int32(l4[8:])} (I05)") | |
# print(f"0x0038: {int32(l4[16:])} (I05)") | |
# print(f"0x003C: {int32(l4[24:])} (I05)") | |
# print() | |
l5 = readline() | |
# print(f"0x0040: {int32(l5[0:])} (I06)") | |
# print(f"0x0044: {int32(l5[8:])} (I06)") | |
# print(f"0x0048: {int32(l5[16:])} (I06)") | |
# print(f"0x004C: {int32(l5[24:])} (I07)") | |
# print() | |
l6 = readline() | |
# print(f"0x0050: {int32(l6[0:])} (I07)") | |
# print(f"0x0054: {int32(l6[8:])} (I07)") | |
# print(f"0x0058: {int32(l6[16:])} (I08)") | |
# print(f"0x005C: {int32(l6[24:])} (I08)") | |
# print() | |
l7 = readline() | |
# print(f"0x0060: {int32(l7[0:])} (I08)") | |
# print(f"0x0064: {int32(l7[8:])} (I09)") | |
# print(f"0x0068: {int32(l7[16:])} (I09)") | |
# print(f"0x006C: {int32(l7[24:])} (I09)") | |
# print() | |
l8 = readline() | |
# print(f"0x0070: {int32(l8[0:])} (I10)") | |
# print(f"0x0074: {int32(l8[8:])} (I10)") | |
# print(f"0x0078: {int32(l8[16:])} (I10)") | |
# print(f"0x007C: {int32(l8[24:])} (I11)") | |
# print() | |
l9 = readline() | |
# print(f"0x0080: {int32(l9[0:])} (I11)") | |
# print(f"0x0084: {int32(l9[8:])} (I11)") | |
# print(f"0x0088: {int32(l9[16:])} (I12)") | |
# print(f"0x008C: {int32(l9[24:])} (I12)") | |
# print() | |
l10 = readline() | |
# print(f"0x0090: {int32(l10[0:])} (I12)") | |
# print(f"0x0094: {int32(l10[8:])} (I13)") | |
# print(f"0x0098: {int32(l10[16:])} (I13)") | |
# print(f"0x009C: {int32(l10[24:])} (I13)") | |
# print() | |
l11 = readline() | |
# print(f"0x00A0: {int32(l11[0:])} (I14)") | |
# print(f"0x00A4: {int32(l11[8:])} (I14)") | |
# print(f"0x00A8: {int32(l11[16:])} (I14)") | |
# print(f"0x00AC: {int32(l11[24:])} (I15)") | |
# print() | |
l12 = readline() | |
# print(f"0x00B0: {int32(l12[0:])} (I15)") | |
# print(f"0x00B4: {int32(l12[8:])} (I15)") | |
# print(f"0x00B8: {int32(l12[16:])} (I16)") | |
# print(f"0x00BC: {int32(l12[24:])} (I16)") | |
# print() | |
l13 = readline() | |
# print(f"0x00C0: {int32(l13[0:])} (I16)") | |
# print(f"0x00C4: {int32(l13[8:])} (I17)") | |
# print(f"0x00C8: {int32(l13[16:])} (I17)") | |
# print(f"0x00CC: {int32(l13[24:])} (I17)") | |
# print() | |
l14 = readline() | |
# print(f"0x00D0: {int32(l14[0:])} (I18)") | |
# print(f"0x00D4: {int32(l14[8:])} (I18)") | |
# print(f"0x00D8: {int32(l14[16:])} (I18)") | |
# print(f"0x00DC: {int32(l14[24:])} (I19)") | |
# print() | |
l15 = readline() | |
# print(f"0x00E0: {int32(l15[0:])} (I19)") | |
# print(f"0x00E4: {int32(l15[8:])} (I19)") | |
# print(f"0x00E8: {uint16(l15[16:])} (V1)") | |
# print(f"0x00EA: {uint16(l15[20:])} (V2)") | |
# print(f"0x00EC: {uint16(l15[24:])} (V3)") | |
# print(f"0x00EE: {uint16(l15[28:])} (Hz)") | |
# print() | |
l16 = readline() | |
# print(f"0x00F0: {uint16(l16[0:])} (V2 degrees)") | |
# print(f"0x00F2: {uint16(l16[4:])} (V3 degrees)") | |
# print(f"0x00F4: {uint16(l16[8:])} (I01)") | |
# print(f"0x00F6: {uint16(l16[12:])} (I02)") | |
# print(f"0x00F8: {uint16(l16[16:])} (I03)") | |
# print(f"0x00FA: {uint16(l16[20:])} (I04)") | |
# print(f"0x00FC: {uint16(l16[24:])} (I05)") | |
# print(f"0x00FE: {uint16(l16[28:])} (I06)") | |
# print() | |
l17 = readline() | |
# print(f"0x0100: {uint16(l17[0:])} (I07)") | |
# print(f"0x0102: {uint16(l17[4:])} (I08)") | |
# print(f"0x0104: {uint16(l17[8:])} (I09)") | |
# print(f"0x0106: {uint16(l17[12:])} (I10)") | |
# print(f"0x0108: {uint16(l17[16:])} (I11)") | |
# print(f"0x010A: {uint16(l17[20:])} (I12)") | |
# print(f"0x010C: {uint16(l17[24:])} (I13)") | |
# print(f"0x010E: {uint16(l17[28:])} (I14)") | |
# print() | |
l18 = readline() | |
# print(f"0x0110: {uint16(l18[0:])} (I15)") | |
# print(f"0x0112: {uint16(l18[4:])} (I16)") | |
# print(f"0x0114: {uint16(l18[8:])} (I17)") | |
# print(f"0x0116: {uint16(l18[12:])} (I18)") | |
# print(f"0x0118: {uint16(l18[16:])} (I19)") | |
# print(f"0x011A: {l18[20:]} (end)") | |
print() | |
def readline(): | |
return PROC.stdout.readline().decode("utf-8").strip() | |
def int8(str): | |
i = int(str[0:2], 16) | |
return ctypes.c_int8(i).value | |
def int16(str): | |
i = int(str[0:4], 16) | |
return ctypes.c_int16(i).value | |
def uint16(str): | |
i = int(str[0:4], 16) | |
return ctypes.c_uint16(i).value | |
def int32(str): | |
i = parse_hex(str[0:8]) | |
return ctypes.c_int32(i).value | |
def parse_hex(str): | |
return struct.unpack("<i", binascii.unhexlify(str))[0] | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment