|
#!/usr/bin/env python3 |
|
|
|
# Usage: |
|
# pip3 install bluepy |
|
# python3 get_beacon_key.py <MAC> <PRODUCT_ID> |
|
# |
|
# List of PRODUCT_ID: |
|
# 53: For 'KS-R1AC' |
|
# |
|
# Example: |
|
# python3 get_beacon_key.py AB:CD:EF:12:34:56 53 |
|
|
|
import re |
|
import sys |
|
import base64 |
|
import time |
|
|
|
from bluepy.btle import Peripheral |
|
|
|
MAC_PATTERN = r"^[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}$" |
|
|
|
UUID_SERVICE = "1234" |
|
|
|
HANDLE_COMMAND = 13 |
|
HANDLE_NOTICE = 19 |
|
NOTIFY_MESSAGE = 18 |
|
|
|
SUBSCRIBE_TRUE = bytes([0x01, 0x00]) |
|
|
|
PACKETS = [ |
|
# shake => 'shake 00' |
|
# DAEEOAU= => DAEEOAUghwS= |
|
bytes([0x0c, 0x01, 0x04, 0x38, 0x05]), |
|
# net => net cloud |
|
# bm/1 => bm/1IGNsb2/k |
|
bytes([0x6e, 0x6f, 0xf5]), |
|
# get_dn => get_dn 100801210820000007 |
|
# 6A/1cARu => 6A/1cARuIw4MhwgMhTIx |
|
bytes([0xe8, 0x0f, 0xf5, 0x70, 0x04, 0x6e]), |
|
# get_pk => get_pk a1hiSOeUajy |
|
# 6A/1c2ar => 6A/1c2arIG4xOGlTTA//YWp5 |
|
bytes([0xe8, 0x0f, 0xf5, 0x73, 0x66, 0xab]), |
|
# time_posix 1629831426 => time_posix 0 |
|
# dGlB6/KMb2NpeCSxNjI5+whxNwIA => dGlB6/KMb2NpeCSM |
|
bytes([0x74, 0x69, 0x41, 0xeb, 0xf2, 0x8c, 0x6f, 0x63, |
|
0x69, 0x78, 0x24, 0xb1, 0x36, 0x32, 0x39, 0x37, |
|
0x06, 0x0c, 0x87, 0x32, 0x00]), |
|
# version => version 0014 |
|
# dm/XDAlvbg== => dm/XDAlvbiSMhw41 |
|
bytes([0x76, 0x6f, 0xd7, 0x0c, 0x09, 0x6f, 0x6e]), |
|
# servers getProp 1 3 7 8 9 16 17 18 19 21 22 23 24 31 => servers 0 |
|
# => props ControlMode 2 ChildLockSwitch 0 runState 0 handrail 0 MaxW 6.0 Max 12.0 StartSpeed 2.0 VelocitySensitivity 2 PanelDisplay 7 |
|
# => props ConSpMode 0 initial 4 mcu_version "0001" unit 0 |
|
# DA/Xdm/XDXan6cR9DmKMIw4ghXS2Iwgg+VSxNiSxNXSx+CSx+VSXhVSXhiSXhXSXNCSzh9== => DA/Xdm/XDXSM |
|
# => DHJvDHhg9AKudHJvb4yv6GUghiawOGls64xvYABTdAl1YAgghCaXdW5TdGF16VSMIGEEbmRXYWlsIwSgTWFt/XSALjSgTWFtIw4XLjSgU2REDnRTDG/l6CSXLjSg/m/sbANpdHlT6W5zOcRpdml1eVSXIFaEbm/sRGlzDGxEeVS2 |
|
# => DHJvDHhg9AKuU2aNbARlIwSgOW5pdGlEbCS1IGyjd/KA6cJzOWKuICIMhwSxIiaybml1IwS= |
|
bytes([0x0c, 0x0f, 0xd7, 0x76, 0x6f, 0xd7, 0x0d, 0x76, |
|
0xa7, 0xe9, 0xc4, 0x7d, 0x0e, 0x62, 0x8c, 0x23, |
|
0x0e, 0x20, 0x85, 0x74, 0xb6, 0x23, 0x08, 0x20, |
|
0xf9, 0x54, 0xb1, 0x36, 0x24, 0xb1, 0x35, 0x74, |
|
0xb1, 0xf8, 0x24, 0xb1, 0xf9, 0x54, 0x97, 0x85, |
|
0x54, 0x97, 0x86, 0x24, 0x97, 0x85, 0x74, 0x97, |
|
0x34, 0x24, 0xb3, 0x87]), |
|
|
|
# # set manual mode |
|
# props ControlMode 1 => props ControlMode 1 |
|
# DHJvDHhg9AKudHJvb4yv6GUgh9== => DHJvDHhg9AKudHJvb4yv6GUgh9== |
|
#bytes([]), |
|
] |
|
|
|
# not perfect caesar cipher from real packet data, and each devices might have |
|
# different table |
|
#PLAIN = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789=/+' |
|
# 01234567890123456789012345678901234567890123456789012345678901234 |
|
#ENCRY = b'SaCw4FGHIJ_LhN+_9RVTU/WcY6ObDde_gEijklmn_p_rsBuvMxXz1yA2t5___K=__' |
|
|
|
# these table can find in `libapp.so`, but it doesn't have `=` char. |
|
# just added a value to easy encrypt/decrypt |
|
PLAIN = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=' |
|
#ORIG = b'ZaCw4FGHIJqLhN+P9RVTU/WcY6ObDdefgEijklmnopQrsBuvMxXz1yA2t5078KS3=' |
|
# Should compare reversing table. |
|
# it should move differernt position with missing part. |
|
# for example, in my case `S` position have to first. |
|
# just swap `Z` <-> `S` for the testing |
|
ENCRY = b'SaCw4FGHIJqLhN+P9RVTU/WcY6ObDdefgEijklmnopQrsBuvMxXz1yA2t5078KZ3=' |
|
|
|
|
|
def encrypt(buf): |
|
ret = b'' |
|
for x in buf: |
|
x = x.to_bytes(1, 'big') |
|
v = ENCRY[PLAIN.find(x)] |
|
if v == 0x5f: # underscore |
|
print('unknown encrypt map:', x) |
|
ret += v.to_bytes(1, 'big') |
|
return ret |
|
|
|
def decrypt(buf): |
|
ret = b'' |
|
for x in buf: |
|
x = x.to_bytes(1, 'big') |
|
v = PLAIN[ENCRY.find(x)] |
|
ret += v.to_bytes(1, 'big') |
|
return ret |
|
|
|
def chunk(lst, n): |
|
for i in range(0, len(lst), n): |
|
yield lst[i:i+n] |
|
|
|
|
|
class Handler: |
|
def __init__(self): |
|
self.reading = False |
|
self._buf = [] |
|
|
|
def handleNotification(self, handle, data): |
|
if not self.reading: |
|
return |
|
if handle == NOTIFY_MESSAGE: |
|
self._buf.append(data) |
|
else: |
|
print('receive handle %s: %s' % (handle, data)) |
|
|
|
def fetch(self): |
|
buf = self._buf |
|
self._buf = [] |
|
return buf |
|
|
|
def empty(self): |
|
return len(self._buf) == 0 |
|
|
|
def get_beacon_key(mac, product_id): |
|
message_handler = Handler() |
|
|
|
# Pairing |
|
input(f"Activate pairing on your '{mac}' device, then press Enter: ") |
|
|
|
# Connect |
|
print("Connection in progress...") |
|
peripheral = Peripheral(deviceAddr=mac) |
|
peripheral.setDelegate(message_handler) |
|
print("Successful connection!") |
|
|
|
service = peripheral.getServiceByUUID(UUID_SERVICE) |
|
#chars = service.getCharacteristics() |
|
#notice_char = None |
|
#for char in chars: |
|
# print(dir(char)) |
|
# if char.getHandle() == HANDLE_NOTICE: |
|
# print('notice_char', char.uuid) |
|
# notice_char = char |
|
for service in peripheral.getServices(): |
|
print(service, service.uuid) |
|
for char in service.getCharacteristics(): |
|
print(char, char.uuid) |
|
print(char.getHandle()) |
|
for desc in char.getDescriptors(): |
|
print(desc, desc.handle, desc.uuid) |
|
descriptors = peripheral.getDescriptors() |
|
notice_descriptor = None |
|
cmd_descriptor = None |
|
for descriptor in descriptors: |
|
# characteristic FED8 |
|
if descriptor.handle == HANDLE_NOTICE: |
|
print('notice', descriptor) |
|
print('notice', descriptor.uuid) |
|
notice_descriptor = descriptor |
|
# characteristic FED7 |
|
if descriptor.handle == HANDLE_COMMAND: |
|
print('command', descriptor) |
|
print('command', descriptor.uuid) |
|
cmd_descriptor = descriptor |
|
notice_descriptor.write(SUBSCRIBE_TRUE, "true") |
|
|
|
def send(msg=b''): |
|
if type(msg) == str: |
|
msg = msg.encode() |
|
else: |
|
msg = bytes(msg) |
|
if len(msg): |
|
msg = base64.b64encode(msg) |
|
msg = encrypt(msg) |
|
msg += b'\x0d' |
|
# packet should split 16bytes |
|
while len(msg) > 16: |
|
cmd_descriptor.write(msg[:16]) |
|
msg = msg[16:] |
|
cmd_descriptor.write(msg[:16]) |
|
|
|
def recv(wait=999): |
|
buf = b'' |
|
w = 0 |
|
while True: |
|
buf += b''.join(message_handler.fetch()) |
|
if len(buf) and buf[-1:] == b'\x0d': |
|
# trim \r |
|
buf = buf[:-1] |
|
buf = decrypt(buf) |
|
#print(buf) |
|
decoded = base64.b64decode(buf) |
|
#print(decoded) |
|
try: |
|
return decoded.decode('utf-8') |
|
except: |
|
return decoded |
|
w += 1 |
|
if w >= wait: |
|
return buf |
|
peripheral.waitForNotifications(1) |
|
|
|
# now starting communication with device |
|
send() |
|
message_handler.reading = True |
|
print("Starting reading...") |
|
# the first packet is `format error` |
|
data = recv() |
|
|
|
print("Handshaking...") |
|
send('shake') |
|
if recv() != 'shake 00': |
|
raise Exception('invalid receive') |
|
send('net') |
|
# maybe `net cloud` |
|
recv() |
|
print("Receive System Informations...") |
|
send('get_dn') |
|
# get_dn XXXXX |
|
dn = recv().split(' ')[1] |
|
print('dn:', dn) |
|
send('get_pk') |
|
# get_pk XXXXX |
|
pk = recv().split(' ')[1] |
|
print('pk:', pk) |
|
send('time_posix %d' % int(time.time())) |
|
# time_posix 0 |
|
recv() |
|
send('version') |
|
# version XXXXX |
|
version = recv().split(' ')[1] |
|
print('ver:', version) |
|
|
|
# 1 - ControlMode |
|
# 2 - RunningDistance BurnCalories |
|
# RunningDistance; 10 is 0.01km or 0.01mile |
|
# BurnCalories; 1234 is 1.234KCal |
|
# 3 - PanelDisplay |
|
# 4 - ButtonId |
|
# 5 - goal ; "0,0" |
|
# 6 - tutorial |
|
# 7 - MaxW ; walking max speed |
|
# 8 - initial |
|
# 9 - unState |
|
# 10 - RunningSteps |
|
# 12 - unState, CurrentSpeed |
|
# 13 - RunningTotalTime |
|
# 14 - m |
|
# 16 - ChildLockSwitch |
|
# 17 - StartSpeed |
|
# 18 - VelocitySensitivity, ControlMode ; VelocitySensitivity - detect auto step sensitive 1: high 2: mid 3: low |
|
# 19 - ConSpMode |
|
# 21 - handrail |
|
# 22 - mcu_version ; "0001" |
|
# 23 - unit ; meter 0 mile 1 |
|
# 24 - Max ; running max speed |
|
send('servers getProp 1 3 7 8 9 16 17 18 19 21 22 23 24 31') |
|
props = dict() |
|
for x in range(3): |
|
data = recv(wait=10) |
|
if data[:5] != 'props': |
|
continue |
|
data = data.strip('props ') |
|
for k, v in chunk(data.split(' '), 2): |
|
props[k] = v |
|
print(props) |
|
|
|
return |
|
|
|
print("Change control...") |
|
# standby |
|
send('props ControlMode 2') |
|
recv() |
|
time.sleep(5) |
|
|
|
# manual mode |
|
send('props ControlMode 1') |
|
recv() |
|
|
|
# auto mode |
|
#send('props ControlMode 0') |
|
#recv() |
|
|
|
# start treadmill |
|
send('props runState 1') |
|
# will return; props runState 1 CurrentSpeed 0.5 RunningSteps 31 spm 0 |
|
recv() |
|
|
|
# will return; |
|
# `props CurrentSpeed X.X ; float` |
|
# or `props RunningSteps XX ; int` |
|
# or ... |
|
|
|
# stop |
|
send('props runState 0') |
|
|
|
# set current speed |
|
# step is 0.5; max 12.0 |
|
#send('props CurrentSpeed 2.5') |
|
|
|
|
|
def main(argv): |
|
# ARGS |
|
if len(argv) <= 2: |
|
print("usage: get_beacon_key.py <MAC> <PRODUCT_ID>\n") |
|
print("PRODUCT_ID:") |
|
print(" 53: For 'KS-R1AC'") |
|
return |
|
|
|
# MAC |
|
mac = argv[1].upper() |
|
if not re.compile(MAC_PATTERN).match(mac): |
|
print(f"[ERROR] The MAC address '{mac}' seems to be in the wrong format") |
|
return |
|
|
|
# PRODUCT_ID |
|
product_id = argv[2] |
|
try: |
|
product_id = int(product_id) |
|
except Exception: |
|
print(f"[ERROR] The Product Id '{product_id}' seems to be in the wrong format") |
|
return |
|
|
|
# BEACON_KEY |
|
get_beacon_key(mac, product_id) |
|
|
|
|
|
if __name__ == '__main__': |
|
main(sys.argv) |