Skip to content

Instantly share code, notes, and snippets.

@d3m3vilurr
Last active September 4, 2021 23:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d3m3vilurr/69b84ce4d57ed8c93b19c93600570394 to your computer and use it in GitHub Desktop.
Save d3m3vilurr/69b84ce4d57ed8c93b19c93600570394 to your computer and use it in GitHub Desktop.
Handshake KingSmith R2
#!/usr/bin/env python3
# Usage:
# pip3 install bluepy
# python3 get_beacon_key.py <MAC> <PRODUCT_ID>
#
# List of PRODUCT_ID:
# 53: For 'KS-R1AC'
#
# Example:
# python3 get_beacon_key.py AB:CD:EF:12:34:56 53
import re
import sys
import base64
import time
from bluepy.btle import Peripheral
MAC_PATTERN = r"^[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}$"
UUID_SERVICE = "1234"
HANDLE_COMMAND = 13
HANDLE_NOTICE = 19
NOTIFY_MESSAGE = 18
SUBSCRIBE_TRUE = bytes([0x01, 0x00])
PACKETS = [
# shake => 'shake 00'
# DAEEOAU= => DAEEOAUghwS=
bytes([0x0c, 0x01, 0x04, 0x38, 0x05]),
# net => net cloud
# bm/1 => bm/1IGNsb2/k
bytes([0x6e, 0x6f, 0xf5]),
# get_dn => get_dn 100801210820000007
# 6A/1cARu => 6A/1cARuIw4MhwgMhTIx
bytes([0xe8, 0x0f, 0xf5, 0x70, 0x04, 0x6e]),
# get_pk => get_pk a1hiSOeUajy
# 6A/1c2ar => 6A/1c2arIG4xOGlTTA//YWp5
bytes([0xe8, 0x0f, 0xf5, 0x73, 0x66, 0xab]),
# time_posix 1629831426 => time_posix 0
# dGlB6/KMb2NpeCSxNjI5+whxNwIA => dGlB6/KMb2NpeCSM
bytes([0x74, 0x69, 0x41, 0xeb, 0xf2, 0x8c, 0x6f, 0x63,
0x69, 0x78, 0x24, 0xb1, 0x36, 0x32, 0x39, 0x37,
0x06, 0x0c, 0x87, 0x32, 0x00]),
# version => version 0014
# dm/XDAlvbg== => dm/XDAlvbiSMhw41
bytes([0x76, 0x6f, 0xd7, 0x0c, 0x09, 0x6f, 0x6e]),
# servers getProp 1 3 7 8 9 16 17 18 19 21 22 23 24 31 => servers 0
# => props ControlMode 2 ChildLockSwitch 0 runState 0 handrail 0 MaxW 6.0 Max 12.0 StartSpeed 2.0 VelocitySensitivity 2 PanelDisplay 7
# => props ConSpMode 0 initial 4 mcu_version "0001" unit 0
# DA/Xdm/XDXan6cR9DmKMIw4ghXS2Iwgg+VSxNiSxNXSx+CSx+VSXhVSXhiSXhXSXNCSzh9== => DA/Xdm/XDXSM
# => DHJvDHhg9AKudHJvb4yv6GUghiawOGls64xvYABTdAl1YAgghCaXdW5TdGF16VSMIGEEbmRXYWlsIwSgTWFt/XSALjSgTWFtIw4XLjSgU2REDnRTDG/l6CSXLjSg/m/sbANpdHlT6W5zOcRpdml1eVSXIFaEbm/sRGlzDGxEeVS2
# => DHJvDHhg9AKuU2aNbARlIwSgOW5pdGlEbCS1IGyjd/KA6cJzOWKuICIMhwSxIiaybml1IwS=
bytes([0x0c, 0x0f, 0xd7, 0x76, 0x6f, 0xd7, 0x0d, 0x76,
0xa7, 0xe9, 0xc4, 0x7d, 0x0e, 0x62, 0x8c, 0x23,
0x0e, 0x20, 0x85, 0x74, 0xb6, 0x23, 0x08, 0x20,
0xf9, 0x54, 0xb1, 0x36, 0x24, 0xb1, 0x35, 0x74,
0xb1, 0xf8, 0x24, 0xb1, 0xf9, 0x54, 0x97, 0x85,
0x54, 0x97, 0x86, 0x24, 0x97, 0x85, 0x74, 0x97,
0x34, 0x24, 0xb3, 0x87]),
# # set manual mode
# props ControlMode 1 => props ControlMode 1
# DHJvDHhg9AKudHJvb4yv6GUgh9== => DHJvDHhg9AKudHJvb4yv6GUgh9==
#bytes([]),
]
# not perfect caesar cipher from real packet data, and each devices might have
# different table
#PLAIN = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789=/+'
# 01234567890123456789012345678901234567890123456789012345678901234
#ENCRY = b'SaCw4FGHIJ_LhN+_9RVTU/WcY6ObDde_gEijklmn_p_rsBuvMxXz1yA2t5___K=__'
# these table can find in `libapp.so`, but it doesn't have `=` char.
# just added a value to easy encrypt/decrypt
PLAIN = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/='
#ORIG = b'ZaCw4FGHIJqLhN+P9RVTU/WcY6ObDdefgEijklmnopQrsBuvMxXz1yA2t5078KS3='
# Should compare reversing table.
# it should move differernt position with missing part.
# for example, in my case `S` position have to first.
# just swap `Z` <-> `S` for the testing
ENCRY = b'SaCw4FGHIJqLhN+P9RVTU/WcY6ObDdefgEijklmnopQrsBuvMxXz1yA2t5078KZ3='
def encrypt(buf):
ret = b''
for x in buf:
x = x.to_bytes(1, 'big')
v = ENCRY[PLAIN.find(x)]
if v == 0x5f: # underscore
print('unknown encrypt map:', x)
ret += v.to_bytes(1, 'big')
return ret
def decrypt(buf):
ret = b''
for x in buf:
x = x.to_bytes(1, 'big')
v = PLAIN[ENCRY.find(x)]
ret += v.to_bytes(1, 'big')
return ret
def chunk(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i+n]
class Handler:
def __init__(self):
self.reading = False
self._buf = []
def handleNotification(self, handle, data):
if not self.reading:
return
if handle == NOTIFY_MESSAGE:
self._buf.append(data)
else:
print('receive handle %s: %s' % (handle, data))
def fetch(self):
buf = self._buf
self._buf = []
return buf
def empty(self):
return len(self._buf) == 0
def get_beacon_key(mac, product_id):
message_handler = Handler()
# Pairing
input(f"Activate pairing on your '{mac}' device, then press Enter: ")
# Connect
print("Connection in progress...")
peripheral = Peripheral(deviceAddr=mac)
peripheral.setDelegate(message_handler)
print("Successful connection!")
service = peripheral.getServiceByUUID(UUID_SERVICE)
#chars = service.getCharacteristics()
#notice_char = None
#for char in chars:
# print(dir(char))
# if char.getHandle() == HANDLE_NOTICE:
# print('notice_char', char.uuid)
# notice_char = char
for service in peripheral.getServices():
print(service, service.uuid)
for char in service.getCharacteristics():
print(char, char.uuid)
print(char.getHandle())
for desc in char.getDescriptors():
print(desc, desc.handle, desc.uuid)
descriptors = peripheral.getDescriptors()
notice_descriptor = None
cmd_descriptor = None
for descriptor in descriptors:
# characteristic FED8
if descriptor.handle == HANDLE_NOTICE:
print('notice', descriptor)
print('notice', descriptor.uuid)
notice_descriptor = descriptor
# characteristic FED7
if descriptor.handle == HANDLE_COMMAND:
print('command', descriptor)
print('command', descriptor.uuid)
cmd_descriptor = descriptor
notice_descriptor.write(SUBSCRIBE_TRUE, "true")
def send(msg=b''):
if type(msg) == str:
msg = msg.encode()
else:
msg = bytes(msg)
if len(msg):
msg = base64.b64encode(msg)
msg = encrypt(msg)
msg += b'\x0d'
# packet should split 16bytes
while len(msg) > 16:
cmd_descriptor.write(msg[:16])
msg = msg[16:]
cmd_descriptor.write(msg[:16])
def recv(wait=999):
buf = b''
w = 0
while True:
buf += b''.join(message_handler.fetch())
if len(buf) and buf[-1:] == b'\x0d':
# trim \r
buf = buf[:-1]
buf = decrypt(buf)
#print(buf)
decoded = base64.b64decode(buf)
#print(decoded)
try:
return decoded.decode('utf-8')
except:
return decoded
w += 1
if w >= wait:
return buf
peripheral.waitForNotifications(1)
# now starting communication with device
send()
message_handler.reading = True
print("Starting reading...")
# the first packet is `format error`
data = recv()
print("Handshaking...")
send('shake')
if recv() != 'shake 00':
raise Exception('invalid receive')
send('net')
# maybe `net cloud`
recv()
print("Receive System Informations...")
send('get_dn')
# get_dn XXXXX
dn = recv().split(' ')[1]
print('dn:', dn)
send('get_pk')
# get_pk XXXXX
pk = recv().split(' ')[1]
print('pk:', pk)
send('time_posix %d' % int(time.time()))
# time_posix 0
recv()
send('version')
# version XXXXX
version = recv().split(' ')[1]
print('ver:', version)
# 1 - ControlMode
# 2 - RunningDistance BurnCalories
# RunningDistance; 10 is 0.01km or 0.01mile
# BurnCalories; 1234 is 1.234KCal
# 3 - PanelDisplay
# 4 - ButtonId
# 5 - goal ; "0,0"
# 6 - tutorial
# 7 - MaxW ; walking max speed
# 8 - initial
# 9 - unState
# 10 - RunningSteps
# 12 - unState, CurrentSpeed
# 13 - RunningTotalTime
# 14 - m
# 16 - ChildLockSwitch
# 17 - StartSpeed
# 18 - VelocitySensitivity, ControlMode ; VelocitySensitivity - detect auto step sensitive 1: high 2: mid 3: low
# 19 - ConSpMode
# 21 - handrail
# 22 - mcu_version ; "0001"
# 23 - unit ; meter 0 mile 1
# 24 - Max ; running max speed
send('servers getProp 1 3 7 8 9 16 17 18 19 21 22 23 24 31')
props = dict()
for x in range(3):
data = recv(wait=10)
if data[:5] != 'props':
continue
data = data.strip('props ')
for k, v in chunk(data.split(' '), 2):
props[k] = v
print(props)
return
print("Change control...")
# standby
send('props ControlMode 2')
recv()
time.sleep(5)
# manual mode
send('props ControlMode 1')
recv()
# auto mode
#send('props ControlMode 0')
#recv()
# start treadmill
send('props runState 1')
# will return; props runState 1 CurrentSpeed 0.5 RunningSteps 31 spm 0
recv()
# will return;
# `props CurrentSpeed X.X ; float`
# or `props RunningSteps XX ; int`
# or ...
# stop
send('props runState 0')
# set current speed
# step is 0.5; max 12.0
#send('props CurrentSpeed 2.5')
def main(argv):
# ARGS
if len(argv) <= 2:
print("usage: get_beacon_key.py <MAC> <PRODUCT_ID>\n")
print("PRODUCT_ID:")
print(" 53: For 'KS-R1AC'")
return
# MAC
mac = argv[1].upper()
if not re.compile(MAC_PATTERN).match(mac):
print(f"[ERROR] The MAC address '{mac}' seems to be in the wrong format")
return
# PRODUCT_ID
product_id = argv[2]
try:
product_id = int(product_id)
except Exception:
print(f"[ERROR] The Product Id '{product_id}' seems to be in the wrong format")
return
# BEACON_KEY
get_beacon_key(mac, product_id)
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment