Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@melianmiko
Created January 16, 2023 19:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save melianmiko/02f7c6a550808e38d9b6760fb688e125 to your computer and use it in GitHub Desktop.
Save melianmiko/02f7c6a550808e38d9b6760fb688e125 to your computer and use it in GitHub Desktop.
Freebuds playground
import code
import os
import socket
import threading
import traceback
import bluetooth
DEVICE_ADDR = "xx:xx:xx:xx:xx:xx"
SPP_SERVICE_UUID = "00001101-0000-1000-8000-00805f9b34fb"
# Some data for more pretty output
CMD_DESCRIPTORS = {
"0107": "DEVICE_INFO",
"0108": "BATTERY_INFO",
"011f": "SET_DOUBLE_TAP",
"0120": "GET_DOUBLE_TAP",
"0127": "BATTERY_STATE_CHANGED",
"012d": "GET_TOUCH_PAD",
"2b03": "IN_EAR_CHANGED",
"2b04": "SET_ANC_MODE",
"2b10": "SET_AUTO_PAUSE",
"2b11": "GET_AUTO_PAUSE",
"2b16": "SET_LONG_TAP",
"2b17": "GET_LONG_TAP",
"2b18": "SET_ANC_PREF",
"2b19": "GET_ANC_PREF",
"2b2a": "CURRENT_ANC_MODE",
"0a0d": "LOG_SPAM",
"0c02": "LIST_LANGUAGES",
}
class State:
"""
Static app state class
"""
socket = None
class Package:
"""
This class implements a full request/response
pacakge, and functions to build/read bytes.
See "Package structure".
"""
def __init__(self, cmd: bytes, parameters):
"""
Build new package
"""
assert len(cmd) == 2
self.command_id = cmd
self.parameters = parameters
self.process_params()
def process_params(self):
"""
Process params (convert int values to bytes, if present)
"""
for i in range(len(self.parameters)):
t, v = self.parameters[i]
if isinstance(v, int):
self.parameters[i] = (t, v.to_bytes(1, signed=True, byteorder="big"))
def __str__(self):
"""
Pretty-print this pacakge contents
"""
out = build_row(12, "COMMAND_ID") + \
build_row(10, "2 bytes") + \
build_row(40, self.command_id.hex(), CMD_DESCRIPTORS) + "\n"
out += 70 * "=" + "\n"
for p_type, p_value in self.parameters:
out += build_row(12, f"PARAM {p_type}") + \
build_row(10, f"{len(p_value)} bytes") + \
build_row(40, p_value.hex())
if all(c < 128 for c in p_value):
# ASCII string
out += p_value.decode("ascii")
out += "\n"
return out
def send(self):
"""
Send this package
"""
bts = self.encode()
print("Send package", bts.hex())
print(self)
State.socket.send(bts)
def encode(self):
"""
Convert this package to bytes.
Used to send them to device
"""
# Build body (command_id + parameters)
body = self.command_id
for p_type, p_value in self.parameters:
p_type = p_type.to_bytes(1, byteorder="big")
p_length = len(p_value).to_bytes(1, byteorder="big")
body += p_type + p_length + p_value
# Build package
result = b"Z" + (len(body) + 1).to_bytes(2, byteorder="big") + b"\x00" + body
result += crc16(result)
return result
@staticmethod
def from_bytes(data: bytes):
"""
Create Package from bytes.
Used to parse incoming data.
"""
assert data[0] == 90
assert data[3] == 0
length = int.from_bytes(data[1:3], byteorder="big")
command_id = data[4:6]
position = 6
parameters = []
while position < length + 3:
p_type = data[position]
p_length = data[position + 1]
p_value = data[position + 2:position + p_length + 2]
parameters.append((p_type, p_value))
position += p_length + 2
return Package(command_id, parameters)
def recv_thread():
"""
This function will infinite read data from socket,
parse them and print to console/file.
"""
conn = State.socket
while True:
try:
byte = conn.recv(4)
if byte[0:2] == b"Z\x00":
length = byte[2]
if length < 4:
conn.recv(length) # Filter trash
continue
# Read data
byte += conn.recv(length)
package = Package.from_bytes(byte)
if package.command_id != b"\x0a\x0d":
print(TermColors.BLUE +
"Got package\n" + str(package) + TermColors.END)
else:
handle_log_pkg(package)
print("Log written to file...")
except (TimeoutError, socket.timeout):
pass
except (ConnectionResetError, ConnectionAbortedError, OSError):
traceback.print_exc()
def handle_log_pkg(log_pkg: Package):
with open("device_logs.csv", "a+") as f:
row = ""
for p_type, p_value in log_pkg.parameters:
if p_type == 4:
p_value = p_value.decode("ascii")
else:
p_value = p_value.hex().upper()
row += f"{p_type};{p_value};"
row += "\n"
f.write(row)
def main():
# Find service
services = bluetooth.find_service(address=DEVICE_ADDR, uuid=SPP_SERVICE_UUID)
assert len(services) > 0
host = services[0]["host"]
port = services[0]["port"]
print(f"Found service at port {port}")
# Connect socket
print("Trying to connect...")
# noinspection PyUnresolvedReferences
conn = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
conn.settimeout(2)
conn.connect((host, port))
# Run recv thread
State.socket = conn
threading.Thread(target=recv_thread).start()
print("OK")
code.interact("", local=globals())
# Force exit
# noinspection PyUnresolvedReferences,PyProtectedMember
os._exit(0)
# ------------------------------------------------
# Tools
# ------------------------------------------------
class TermColors:
BLUE = '\033[94m'
GREEN = '\033[92m'
END = '\033[0m'
def build_row(ln, val, description_table=None):
if description_table is not None and val in description_table:
val = f"{val} ({description_table[val]})"
return str(val).ljust(ln) + " | "
def crc16(data):
"""
I don't found good implementation of CRC16-XModem in Python
and don't want to implement it by myself, for now, so I use
this, with table =)
"""
crc16_tab = [0, 4129, 8258, 12387, 16516, 20645, 24774, 28903, -32504, -28375, -24246, -20117, -15988, -11859,
-7730,
-3601, 4657, 528, 12915, 8786, 21173, 17044, 29431, 25302, -27847, -31976, -19589, -23718, -11331,
-15460, -3073, -7202, 9314, 13379, 1056, 5121, 25830, 29895, 17572, 21637, -23190, -19125, -31448,
-27383, -6674, -2609, -14932, -10867, 13907, 9842, 5649, 1584, 30423, 26358, 22165, 18100, -18597,
-22662, -26855, -30920, -2081, -6146, -10339, -14404, 18628, 22757, 26758, 30887, 2112, 6241, 10242,
14371, -13876, -9747, -5746, -1617, -30392, -26263, -22262, -18133, 23285, 19156, 31415, 27286, 6769,
2640, 14899, 10770, -9219, -13348, -1089, -5218, -25735, -29864, -17605, -21734, 27814, 31879, 19684,
23749, 11298, 15363, 3168, 7233, -4690, -625, -12820, -8755, -21206, -17141, -29336, -25271, 32407,
28342, 24277, 20212, 15891, 11826, 7761, 3696, -97, -4162, -8227, -12292, -16613, -20678, -24743,
-28808, -28280, -32343, -20022, -24085, -12020, -16083, -3762, -7825, 4224, 161, 12482, 8419, 20484,
16421, 28742, 24679, -31815, -27752, -23557, -19494, -15555, -11492, -7297, -3234, 689, 4752, 8947,
13010, 16949, 21012, 25207, 29270, -18966, -23093, -27224, -31351, -2706, -6833, -10964, -15091, 13538,
9411, 5280, 1153, 29798, 25671, 21540, 17413, -22565, -18438, -30823, -26696, -6305, -2178, -14563,
-10436, 9939, 14066, 1681, 5808, 26199, 30326, 17941, 22068, -9908, -13971, -1778, -5841, -26168,
-30231, -18038, -22101, 22596, 18533, 30726, 26663, 6336, 2273, 14466, 10403, -13443, -9380, -5313,
-1250, -29703, -25640, -21573, -17510, 19061, 23124, 27191, 31254, 2801, 6864, 10931, 14994, -722,
-4849, -8852, -12979, -16982, -21109, -25112, -29239, 31782, 27655, 23652, 19525, 15522, 11395, 7392,
3265, -4321, -194, -12451, -8324, -20581, -16454, -28711, -24584, 28183, 32310, 20053, 24180, 11923,
16050, 3793, 7920]
s = 0
for byte in data:
s = crc16_tab[((s >> 8) ^ byte) & 255] ^ (s << 8)
s = s & 0b1111111111111111 # use only 16 bits
return s.to_bytes(2, "big")
if __name__ == "__main__":
source = bytes.fromhex("5a0007002b1901000200ff0f")
pkg = Package.from_bytes(source)
result = pkg.encode()
assert source == result
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment