Skip to content

Instantly share code, notes, and snippets.

@topnotcher
Created March 8, 2016 00:43
Show Gist options
  • Save topnotcher/5ce1339ddb206d685a2b to your computer and use it in GitHub Desktop.
Save topnotcher/5ce1339ddb206d685a2b to your computer and use it in GitHub Desktop.
import socket
import asyncio
import fcntl
from ctypes import BigEndianStructure, Structure, c_uint16, c_uint8, c_char, c_uint32, sizeof, c_int
from purplehaze.bluetooth import BluetoothManager
SOL_L2CAP = 6
L2CAP_OPTIONS = 0x01
BNEP_MTU = 1691
L2CAP_MODE_BASIC = 0x00
SOL_BLUETOOTH = 274
BTPROTO_BNEP = 4
BT_SECURITY = 4
BT_SECURITY_LOW = 1
BT_SECURITY_HIGH = 3
BT_SECURITY_MEDIUM = 2
BT_SECURITY_FIPS = 4
L2CAP_LM = 0x03
L2CAP_LM_AUTH = 0x0002
L2CAP_LM_ENCRYPT = 0x0004
L2CAP_LM_SECURE = 0x0020
BNEP_PSM = 0x0f
BNEPCONNADD = 0x400442C8
BNEPCONNDEL = 0x400442C9
BNEPGETCONNLIST = 0x800442D2
BNEPGETCONNINFO = 0x800442D3
BNEPGETSUPPFEAT = 0x800442D4
BNEP_CONTROL = 0x01
BNEP_SVC_NAP = 0x1116
BNEP_SETUP_RESPONSE = 0
class bnep_connadd_req(Structure):
_fields_ = [
('sock', c_int),
('flags', c_uint32),
('role', c_uint16),
('device', c_char * 16),
]
class bnep_conndel_req(Structure):
_fields_ = [
('flags', c_uint32),
('dst', c_uint8 * 6),
]
class bnep_setup_conn_req(BigEndianStructure):
_pack_ = 1
_fields_ = [
('type', c_uint8),
('ctrl', c_uint8),
('uuid_size', c_uint8)
]
# struct bnep_setup_conn_req {
# __u8 type;
# __u8 ctrl;
# __u8 uuid_size;
# __u8 service[0];
# } __packed;
class l2cap_options(Structure):
_fields_ = [
('omtu', c_uint16),
('imtu', c_uint16),
('flush_to', c_uint16),
('mode', c_uint8),
('fcs', c_uint8),
('max_tx', c_uint8),
('txwin_size', c_uint16),
]
def __init__(self):
self.omtu = 0
self.imtu = 0
self.flush_to = 0
self.mode = 0
self.fcs = 0
self.max_tx = 0
self.txwin_size = 0
class bt_security(Structure):
_fields_ = [
('level', c_uint8),
('key_size', c_uint8)
]
def __init__(self):
self.level = 0
self.key_size = 0
def l2cap_set_lm(sock, sec_level):
lm_map = [
0,
L2CAP_LM_AUTH,
L2CAP_LM_AUTH | L2CAP_LM_ENCRYPT,
L2CAP_LM_AUTH | L2CAP_LM_ENCRYPT, L2CAP_LM_SECURE
]
opt = lm_map[sec_level]
sock.setsockopt(SOL_L2CAP, L2CAP_LM, opt)
def set_sec_level(sock, sec_level):
""" gboolean set_sec_level(int sock, BtIOType type, int level, GError **err)"""
if sec_level < BT_SECURITY_LOW or sec_level > BT_SECURITY_HIGH:
raise ValueError("invalid security level.")
sec = bt_security()
sec.level = sec_level
sock.setsockopt(SOL_BLUETOOTH, BT_SECURITY, bytes(sec))
l2cap_set_lm(sock, sec_level)
def bnep_connadd(ctl, sock, role):
req = bnep_connadd_req()
req.sock = sock.fileno()
req.role = role
req.device = b'\x00'
# seems to inciate that we want the kernel to send a bnep setup response
# net/bluetooth/bnep/core.c
req.flags = (1 << BNEP_SETUP_RESPONSE)
fcntl.ioctl(ctl, BNEPCONNADD, req)
return req.device.decode('utf8')
def bnep_server(bdaddr):
print('NAP BDADDR:', bdaddr)
svr = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
svr.bind((bdaddr, BNEP_PSM))
opts = l2cap_options()
opts.otmu = BNEP_MTU
opts.imtu = BNEP_MTU
opts.mode = L2CAP_MODE_BASIC
svr.setsockopt(SOL_L2CAP, L2CAP_OPTIONS, bytes(opts))
set_sec_level(svr, BT_SECURITY_LOW)
bnepctl = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, BTPROTO_BNEP)
svr.listen(1)
while True:
print('waiting for connection...')
client = svr.accept()
# peek: the kernel will handle the request
data = client.recv(BNEP_MTU, socket.MSG_PEEK)
if len(data) == 0:
raise IOError("bnep: No packet received on l2cap socket")
if len(data) < sizeof(bnep_setup_conn_req):
raise ValueError("bnep: Packet received is not bnep type")
req = bnep_setup_conn_req.from_buffer_copy(data)
if req.type != BNEP_CONTROL:
raise ValueError("bnep: Packet received is not bnep type")
# discard the UUID. I should read this and do things, but I only support NAP.
derp = client.recv(req.uuid_size)
assert req.uuid_size == len(derp)
dev = bnep_connadd(bnepctl, client, BNEP_SVC_NAP)
print('BNEP connection accepted:', dev)
async def _main(loop):
mgr = BluetoothManager(loop)
controller = (await mgr.get_controllers()).pop()
await controller.power_on()
try:
bnep_server(controller.bdaddr)
except KeyboardInterrupt:
pass
await btmgr.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(_main(loop))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment