Skip to content

Instantly share code, notes, and snippets.

@a1ien
Created February 20, 2020 14:52
Show Gist options
  • Save a1ien/fa3a85de4e0435ad6a301f7311747180 to your computer and use it in GitHub Desktop.
Save a1ien/fa3a85de4e0435ad6a301f7311747180 to your computer and use it in GitHub Desktop.
class HCI_Cmd_LE_Set_Random_Address(aiobs.HCI_Command):
def __init__(self,peer_addr="24:2B:EC:17:76:4D"):
super(self.__class__, self).__init__(b"\x08",b"\x05")
self.payload.append(aiobs.MACAddr("address",mac=peer_addr))
class BLEScanRequester(asyncio.Protocol):
'''Protocol handling the requests'''
def __init__(self):
self.transport = None
self.smac = None
self.sip = None
self.process = self.default_process
def connection_made(self, transport):
self.transport = transport
command=HCI_Cmd_LE_Set_Random_Address()
self.transport.write(command.encode())
def connection_lost(self, exc):
super().connection_lost(exc)
def send_scan_request(self):
'''Sending LE scan request'''
command=HCI_Cmd_LE_Ext_Scan_Enable(True,False)
self.transport.write(command.encode())
def stop_scan_request(self):
'''Sending LE scan request'''
command=HCI_Cmd_LE_Ext_Scan_Enable(False,False)
self.transport.write(command.encode())
def send_command(self,command):
'''Sending an arbitrary command'''
self.transport.write(command.encode())
def data_received(self, packet):
self.process(packet)
def default_process(self,data):
pass
class HCI_Cmd_LE_Set_Ext_Scan_Params(aiobs.HCI_Command):
def __init__(self, scan_type=0x0,interval=10, window=750, oaddr_type=1,filter=0):
super(self.__class__, self).__init__(b"\x08",b"\x41")
self.payload.append(aiobs.EnumByte("own addresss type",oaddr_type,
{0: "Public",
1: "Random",
2: "Private IRK or Public",
3: "Private IRK or Random"}))
self.payload.append(aiobs.EnumByte("filter policy",filter,
{0: "None",
1: "Sender In White List",
2: "Almost None",
3: "SIWL and some"}))
self.payload.append(aiobs.EnumByte("primary phy",1,{1:"LE 1M", 3:"LE Coded"}))
self.payload.append(aiobs.EnumByte("scan type",scan_type,
{0: "Passive",
1: "Active"}))
self.payload.append(aiobs.UShortInt("Interval",int(round(min(10240,max(2.5,interval))/0.625)),endian="little"))
self.payload.append(aiobs.UShortInt("Window",int(round(min(10240,max(2.5,min(interval,window)))/0.625)),endian="little"))
class HCI_Cmd_LE_Ext_Scan_Enable(aiobs.HCI_Command):
"""Class representing a command HCI command to enable/disable BLE scanning.
:param enable: enable/disable scanning.
:type enable: bool
:param filter_dups: filter duplicates.
:type filter_dups: bool
:returns: HCI_Cmd_LE_Scan_Enable instance.
:rtype: HCI_Cmd_LE_Scan_Enable
"""
def __init__(self,enable=True,filter_dups=True,duration=0,period=0):
super(self.__class__, self).__init__(b"\x08",b"\x42")
self.payload.append(aiobs.Bool("enable",enable))
self.payload.append(aiobs.Bool("filter",filter_dups))
self.payload.append(aiobs.UShortInt("Duration",int(round(min(10240,max(2.5,duration))/0.625)),endian="little"))
self.payload.append(aiobs.UShortInt("Period",int(round(min(10240,max(2.5,period))/0.625)),endian="little"))
######## hack run function
class HCIdump(Thread):
"""Mimic deprecated hcidump tool."""
def __init__(self, dumplist, interface=0, active=0):
"""Initiate HCIdump thread."""
Thread.__init__(self)
_LOGGER.debug("HCIdump thread: Init")
self._interface = interface
self._active = active
self.dumplist = dumplist
self._event_loop = None
_LOGGER.debug("HCIdump thread: Init finished")
def process_hci_events(self, data):
"""Collect HCI events."""
self.dumplist.append(data)
def run(self):
"""Run HCIdump thread."""
_LOGGER.debug("HCIdump thread: Run")
try:
mysocket = aiobs.create_bt_socket(self._interface)
except OSError as error:
_LOGGER.error("HCIdump thread: OS error: %s", error)
else:
self._event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._event_loop)
fac = self._event_loop._create_connection_transport(
mysocket, BLEScanRequester, None, None # use our BLEScanRequester
)
_LOGGER.debug("HCIdump thread: Connection")
conn, btctrl = self._event_loop.run_until_complete(fac)
_LOGGER.debug("HCIdump thread: Connected")
btctrl.process = self.process_hci_events
#btctrl.send_command(
# aiobs.HCI_Cmd_LE_Set_Scan_Params(scan_type=self._active)
#)
btctrl.send_command(HCI_Cmd_LE_Set_Random_Address())
btctrl.send_command(HCI_Cmd_LE_Set_Ext_Scan_Params())
btctrl.send_scan_request()
_LOGGER.debug("HCIdump thread: start main event_loop")
try:
self._event_loop.run_forever()
finally:
_LOGGER.debug("HCIdump thread: main event_loop stopped, finishing")
btctrl.stop_scan_request()
conn.close()
self._event_loop.run_until_complete(asyncio.sleep(0))
self._event_loop.close()
_LOGGER.debug("HCIdump thread: Run finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment