Skip to content

Instantly share code, notes, and snippets.

@audetto
Last active August 29, 2015 14:20
Show Gist options
  • Save audetto/fa165d241002c88ba91e to your computer and use it in GitHub Desktop.
Save audetto/fa165d241002c88ba91e to your computer and use it in GitHub Desktop.
POC communication with a Pool Mate Live
import time
import serial
import struct
delay = 0.1
def crc32a(data):
crc = 0xFFFFFFFF
for x in data[6:]:
for j in range(0, 8):
if (crc >> 24 ^ x) & 0x80 != 0:
crc = (crc << 1) ^ 0x04C11DB7
else:
crc = crc << 1
crc = crc & 0xFFFFFFFF
x = x << 1
crc = (~crc) & 0xFFFFFFFF
return crc
def convert(data):
return '[' + ', '.join('{:02x}'.format(x) for x in data) + ']'
def makeDonwloadRequest(template, i):
template[9] = i
return template
def write(data, direction):
s = convert(bytearray(data))
length = len(data)
msg = "{0:<4} {1:>5} {2}".format(direction, length, s)
print(msg)
def writeAndReply(port, data):
crc = crc32a(data)
crc = bytearray(struct.unpack("4B", struct.pack("I", crc)))
data = data + crc
size = len(data)
port.write(data)
time.sleep(delay)
write(data, "OUT")
reply = port.inWaiting()
if reply < size:
print("Missing answer. Expected {0}, received {1}".format(size, reply))
return
# normally it just sends back the exact same message
rep = port.read(size)
time.sleep(delay)
write(rep, "IN")
# sometimes there is a real payload in the reply
rep = bytes()
while port.inWaiting() > 0:
reply = port.inWaiting()
rep = rep + port.read(reply)
time.sleep(delay)
write(rep, "DATA")
print("")
port = serial.Serial("/dev/ttyUSB0", baudrate = 250000)
time.sleep(1)
data1 = bytearray([0x00, 0x00, 0x55, 0x55, 0x55, 0x55, 0x00, 0x63, 0x63, 0x00, 0x00, 0x00])
data2 = bytearray([0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00])
# year day month
data3 = bytearray([0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x01, 0x07, 0x19, 0x05, 0x0F, 0x00, 0x30, 0x36, 0x15, 0x00]) # sets the time
data4 = bytearray([0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x01, 0x00, 0x00, 0x08, 0x08])
data5 = bytearray([0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x03, 0x04, 0x00, 0x00, 0x00])
# for upload
data6 = bytearray([0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x06, 0x80, 0x00, 0x00, 0x00])
downloadTemplate = bytearray([0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00])
# this is the traffic if the PC does NOT try to upload anything
# it is what the WinXP app does if it is run when the watch is already in the plug
# (just it repeats the same calls many more times than here)
writeAndReply(port, data1)
writeAndReply(port, data2)
writeAndReply(port, data3)
writeAndReply(port, data4)
writeAndReply(port, data5)
# upload
writeAndReply(port, data6)
for x in range(0, 0x13):
data = makeDonwloadRequest(downloadTemplate, x)
writeAndReply(port, data)
port = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment