Created
February 20, 2020 14:52
-
-
Save a1ien/fa3a85de4e0435ad6a301f7311747180 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class HCI_Cmd_LE_Set_Random_Address(aiobs.HCI_Command): | |
def __init__(self,peer_addr="24:2B:EC:17:76:4D"): | |
super(self.__class__, self).__init__(b"\x08",b"\x05") | |
self.payload.append(aiobs.MACAddr("address",mac=peer_addr)) | |
class BLEScanRequester(asyncio.Protocol): | |
'''Protocol handling the requests''' | |
def __init__(self): | |
self.transport = None | |
self.smac = None | |
self.sip = None | |
self.process = self.default_process | |
def connection_made(self, transport): | |
self.transport = transport | |
command=HCI_Cmd_LE_Set_Random_Address() | |
self.transport.write(command.encode()) | |
def connection_lost(self, exc): | |
super().connection_lost(exc) | |
def send_scan_request(self): | |
'''Sending LE scan request''' | |
command=HCI_Cmd_LE_Ext_Scan_Enable(True,False) | |
self.transport.write(command.encode()) | |
def stop_scan_request(self): | |
'''Sending LE scan request''' | |
command=HCI_Cmd_LE_Ext_Scan_Enable(False,False) | |
self.transport.write(command.encode()) | |
def send_command(self,command): | |
'''Sending an arbitrary command''' | |
self.transport.write(command.encode()) | |
def data_received(self, packet): | |
self.process(packet) | |
def default_process(self,data): | |
pass | |
class HCI_Cmd_LE_Set_Ext_Scan_Params(aiobs.HCI_Command): | |
def __init__(self, scan_type=0x0,interval=10, window=750, oaddr_type=1,filter=0): | |
super(self.__class__, self).__init__(b"\x08",b"\x41") | |
self.payload.append(aiobs.EnumByte("own addresss type",oaddr_type, | |
{0: "Public", | |
1: "Random", | |
2: "Private IRK or Public", | |
3: "Private IRK or Random"})) | |
self.payload.append(aiobs.EnumByte("filter policy",filter, | |
{0: "None", | |
1: "Sender In White List", | |
2: "Almost None", | |
3: "SIWL and some"})) | |
self.payload.append(aiobs.EnumByte("primary phy",1,{1:"LE 1M", 3:"LE Coded"})) | |
self.payload.append(aiobs.EnumByte("scan type",scan_type, | |
{0: "Passive", | |
1: "Active"})) | |
self.payload.append(aiobs.UShortInt("Interval",int(round(min(10240,max(2.5,interval))/0.625)),endian="little")) | |
self.payload.append(aiobs.UShortInt("Window",int(round(min(10240,max(2.5,min(interval,window)))/0.625)),endian="little")) | |
class HCI_Cmd_LE_Ext_Scan_Enable(aiobs.HCI_Command): | |
"""Class representing a command HCI command to enable/disable BLE scanning. | |
:param enable: enable/disable scanning. | |
:type enable: bool | |
:param filter_dups: filter duplicates. | |
:type filter_dups: bool | |
:returns: HCI_Cmd_LE_Scan_Enable instance. | |
:rtype: HCI_Cmd_LE_Scan_Enable | |
""" | |
def __init__(self,enable=True,filter_dups=True,duration=0,period=0): | |
super(self.__class__, self).__init__(b"\x08",b"\x42") | |
self.payload.append(aiobs.Bool("enable",enable)) | |
self.payload.append(aiobs.Bool("filter",filter_dups)) | |
self.payload.append(aiobs.UShortInt("Duration",int(round(min(10240,max(2.5,duration))/0.625)),endian="little")) | |
self.payload.append(aiobs.UShortInt("Period",int(round(min(10240,max(2.5,period))/0.625)),endian="little")) | |
######## hack run function | |
class HCIdump(Thread): | |
"""Mimic deprecated hcidump tool.""" | |
def __init__(self, dumplist, interface=0, active=0): | |
"""Initiate HCIdump thread.""" | |
Thread.__init__(self) | |
_LOGGER.debug("HCIdump thread: Init") | |
self._interface = interface | |
self._active = active | |
self.dumplist = dumplist | |
self._event_loop = None | |
_LOGGER.debug("HCIdump thread: Init finished") | |
def process_hci_events(self, data): | |
"""Collect HCI events.""" | |
self.dumplist.append(data) | |
def run(self): | |
"""Run HCIdump thread.""" | |
_LOGGER.debug("HCIdump thread: Run") | |
try: | |
mysocket = aiobs.create_bt_socket(self._interface) | |
except OSError as error: | |
_LOGGER.error("HCIdump thread: OS error: %s", error) | |
else: | |
self._event_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self._event_loop) | |
fac = self._event_loop._create_connection_transport( | |
mysocket, BLEScanRequester, None, None # use our BLEScanRequester | |
) | |
_LOGGER.debug("HCIdump thread: Connection") | |
conn, btctrl = self._event_loop.run_until_complete(fac) | |
_LOGGER.debug("HCIdump thread: Connected") | |
btctrl.process = self.process_hci_events | |
#btctrl.send_command( | |
# aiobs.HCI_Cmd_LE_Set_Scan_Params(scan_type=self._active) | |
#) | |
btctrl.send_command(HCI_Cmd_LE_Set_Random_Address()) | |
btctrl.send_command(HCI_Cmd_LE_Set_Ext_Scan_Params()) | |
btctrl.send_scan_request() | |
_LOGGER.debug("HCIdump thread: start main event_loop") | |
try: | |
self._event_loop.run_forever() | |
finally: | |
_LOGGER.debug("HCIdump thread: main event_loop stopped, finishing") | |
btctrl.stop_scan_request() | |
conn.close() | |
self._event_loop.run_until_complete(asyncio.sleep(0)) | |
self._event_loop.close() | |
_LOGGER.debug("HCIdump thread: Run finished") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment