Skip to content

Instantly share code, notes, and snippets.

@vitouXY
Created April 18, 2020 02:51
Show Gist options
  • Save vitouXY/d13dc3d761fc06432c056434c4b9bc12 to your computer and use it in GitHub Desktop.
Save vitouXY/d13dc3d761fc06432c056434c4b9bc12 to your computer and use it in GitHub Desktop.
Connet the RPi to Android AP (over Bluethooth)
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
bt-tether.py - Connet the Pi to Android AP (over Bluethooth)
Source:
Code adapted by dadav for Pwnagotchi plugins,
https://github.com/evilsocket/pwnagotchi/blob/master/pwnagotchi/plugins/default/bt-tether.py
If you improve it, let me know!
...Sincerely, a ScriptKid!die :v
"""
from __future__ import absolute_import, print_function
''' FIRST !!!
PI:MA:CA:DD:RE:SS
AN:DR:OI:D0:0M:AC
# ~$ echo -en 'list' | sudo bluetoothctl
# ~$ echo -en 'select PI:MA:CA:DD:RE:SS' | sudo bluetoothctl
# ~$ echo -en 'power on\nagent on\n default-agent' | sudo bluetoothctl
# ~$ echo -en 'scan on' | sudo bluetoothctl
# ~$ echo -en 'scan off' | sudo bluetoothctl
# ~$ echo -e 'trust AN:DR:OI:D0:0M:AC' | sudo bluetoothctl
# ~$ echo -e 'pair AN:DR:OI:D0:0M:AC' | sudo bluetoothctl
# ~$ #echo -e 'connect AN:DR:OI:D0:0M:AC' | sudo bluetoothctl
# ~$ echo -e 'info AN:DR:OI:D0:0M:AC' | sudo bluetoothctl | grep 'Connected:' | grep 'yes' && (echo 'Connected!!') || (exit 2)
'''
import signal, os, sys
if sys.version_info < (3, 0):
sys.exit("{}:\n Currently, this program runs only under python3".format(sys.version_info))
if os.geteuid() != 0:
sys.exit('{}: Operation not permitted'.format(os.geteuid()))
if os.name != 'posix':
sys.exit('{}: Platform not supported'.format(os.name))
def keyboardInterruptHandler(signal, frame ):
sys.exit( " KeyboardInterrupt (ID: {}) has been caught. Cleaning up..." . format(signal ))
signal.signal(signal.SIGINT, keyboardInterruptHandler)
#
###################################################
# (part.of) [1f2dd73 4 days ago]
# fs/__init__.py
# from pwnagotchi.fs import ensure_write
#
###### For utils.py
import os
import re
import tempfile
import contextlib
import shutil
import _thread
import logging
from time import sleep
from distutils.dir_util import copy_tree
mounts = list()
@contextlib.contextmanager
def ensure_write(filename, mode='w'):
path = os.path.dirname(filename)
fd, tmp = tempfile.mkstemp(dir=path)
with os.fdopen(fd, mode) as f:
yield f
f.flush()
os.fsync(f.fileno())
os.replace(tmp, filename)
###################################################
# (part.of) [1f2dd73 4 days ago]
# utils.py
# from pwnagotchi.utils import StatusFile
#
###### For bt-tether.py
import logging
import glob
import os
import time
import subprocess
import json
import shutil
import toml # python3 -m pip install toml
import sys
import re
from toml.encoder import TomlEncoder, _dump_str
from zipfile import ZipFile
from datetime import datetime
from enum import Enum
class StatusFile(object):
def __init__(self, path, data_format='raw'):
self._path = path
self._updated = None
self._format = data_format
self.data = None
if os.path.exists(path):
self._updated = datetime.fromtimestamp(os.path.getmtime(path))
with open(path) as fp:
if data_format == 'json':
self.data = json.load(fp)
else:
self.data = fp.read()
def data_field_or(self, name, default=""):
if self.data is not None and name in self.data:
return self.data[name]
return default
def newer_then_minutes(self, minutes):
return self._updated is not None and ((datetime.now() - self._updated).seconds / 60) < minutes
def newer_then_hours(self, hours):
return self._updated is not None and ((datetime.now() - self._updated).seconds / (60 * 60)) < hours
def newer_then_days(self, days):
return self._updated is not None and (datetime.now() - self._updated).days < days
def update(self, data=None):
#from pwnagotchi.fs import ensure_write
self._updated = datetime.now()
self.data = data
with ensure_write(self._path, 'w') as fp:
if data is None:
fp.write(str(self._updated))
elif self._format == 'json':
json.dump(self.data, fp)
else:
fp.write(data)
###################################################
# [51e13aa on 14 Jan]
# plugins/default/bt-tether.py
#
######
import os
import subprocess
import time
from threading import Lock
import dbus
#import pwnagotchi.plugins as plugins
#import pwnagotchi.ui.fonts as fonts
#from pwnagotchi.ui.components import LabeledValue
#from pwnagotchi.ui.view import BLACK
#from pwnagotchi.utils import StatusFile
class BTError(Exception):
"""
Custom bluetooth exception
"""
pass
class BTNap:
"""
This class creates a bluetooth connection to the specified bt-mac
see https://github.com/bablokb/pi-btnap/blob/master/files/usr/local/sbin/btnap.service.py
"""
IFACE_BASE = 'org.bluez'
IFACE_DEV = 'org.bluez.Device1'
IFACE_ADAPTER = 'org.bluez.Adapter1'
IFACE_PROPS = 'org.freedesktop.DBus.Properties'
def __init__(self, mac):
self._mac = mac
@staticmethod
def get_bus():
"""
Get systembus obj
"""
bus = getattr(BTNap.get_bus, 'cached_obj', None)
if not bus:
bus = BTNap.get_bus.cached_obj = dbus.SystemBus()
return bus
@staticmethod
def get_manager():
"""-
Get manager obj
"""
manager = getattr(BTNap.get_manager, 'cached_obj', None)
if not manager:
manager = BTNap.get_manager.cached_obj = dbus.Interface(
BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'),
'org.freedesktop.DBus.ObjectManager')
return manager
@staticmethod
def prop_get(obj, k, iface=None):
"""
Get a property of the obj
"""
if iface is None:
iface = obj.dbus_interface
return obj.Get(iface, k, dbus_interface=BTNap.IFACE_PROPS)
@staticmethod
def prop_set(obj, k, v, iface=None):
"""
Set a property of the obj
"""
if iface is None:
iface = obj.dbus_interface
return obj.Set(iface, k, v, dbus_interface=BTNap.IFACE_PROPS)
@staticmethod
def find_adapter(pattern=None):
"""
Find the bt adapter
"""
return BTNap.find_adapter_in_objects(BTNap.get_manager().GetManagedObjects(), pattern)
@staticmethod
def find_adapter_in_objects(objects, pattern=None):
"""
Finds the obj with a pattern
"""
bus, obj = BTNap.get_bus(), None
for path, ifaces in objects.items():
adapter = ifaces.get(BTNap.IFACE_ADAPTER)
if adapter is None:
continue
if not pattern or pattern == adapter['Address'] or path.endswith(pattern):
obj = bus.get_object(BTNap.IFACE_BASE, path)
yield dbus.Interface(obj, BTNap.IFACE_ADAPTER)
if obj is None:
raise BTError('Bluetooth adapter not found')
@staticmethod
def find_device(device_address, adapter_pattern=None):
"""
Finds the device
"""
return BTNap.find_device_in_objects(BTNap.get_manager().GetManagedObjects(),
device_address, adapter_pattern)
@staticmethod
def find_device_in_objects(objects, device_address, adapter_pattern=None):
"""
Finds the device in objects
"""
bus = BTNap.get_bus()
path_prefix = ''
if adapter_pattern:
if not isinstance(adapter_pattern, str):
adapter = adapter_pattern
else:
adapter = BTNap.find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.items():
device = ifaces.get(BTNap.IFACE_DEV)
if device is None:
continue
if str(device['Address']).lower() == device_address.lower() and path.startswith(path_prefix):
obj = bus.get_object(BTNap.IFACE_BASE, path)
return dbus.Interface(obj, BTNap.IFACE_DEV)
raise BTError('Bluetooth device not found')
def power(self, on=True):
"""
Set power of devices to on/off
"""
print("debug:BT-TETHER: Changing bluetooth device to %s" % (str(on)))
try:
devs = list(BTNap.find_adapter())
devs = dict((BTNap.prop_get(dev, 'Address'), dev) for dev in devs)
except BTError as bt_err:
print(bt_err)
return None
for dev_addr, dev in devs.items():
BTNap.prop_set(dev, 'Powered', on)
print('debug:Set power of %s (addr %s) to %s' % (dev.object_path, dev_addr, str(on)))
if devs:
return list(devs.values())[0]
return None
def is_paired(self):
"""
Check if already connected
"""
print("debug:BT-TETHER: Checking if device is paired")
bt_dev = self.power(True)
if not bt_dev:
print("debug:BT-TETHER: No bluetooth device found.")
return False
try:
dev_remote = BTNap.find_device(self._mac, bt_dev)
return bool(BTNap.prop_get(dev_remote, 'Paired'))
except BTError:
print("debug:BT-TETHER: Device is not paired.")
return False
def wait_for_device(self, timeout=15):
"""
Wait for device
returns device if found None if not
"""
print("debug:BT-TETHER: Waiting for device")
bt_dev = self.power(True)
if not bt_dev:
print("debug:BT-TETHER: No bluetooth device found.")
return None
try:
print("debug:BT-TETHER: Starting discovery ...")
bt_dev.StartDiscovery()
except Exception as bt_ex:
print(bt_ex)
raise bt_ex
dev_remote = None
# could be set to 0, so check if > -1
while timeout > -1:
try:
dev_remote = BTNap.find_device(self._mac, bt_dev)
print("debug:BT-TETHER: Using remote device (addr: %s): %s" \
% (BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path))
break
except BTError:
print("debug:BT-TETHER: Not found yet ...")
time.sleep(1)
timeout -= 1
try:
print("debug:BT-TETHER: Stopping Discovery ...")
bt_dev.StopDiscovery()
except Exception as bt_ex:
print(bt_ex)
raise bt_ex
return dev_remote
@staticmethod
def pair(device):
print('debug:BT-TETHER: Trying to pair ...')
try:
device.Pair()
print('debug:BT-TETHER: Successful paired with device ;)')
return True
except dbus.exceptions.DBusException as err:
if err.get_dbus_name() == 'org.bluez.Error.AlreadyExists':
print('debug:BT-TETHER: Already paired ...')
return True
except Exception:
pass
return False
@staticmethod
def nap(device):
print('debug:BT-TETHER: Trying to nap ...')
try:
print('debug:BT-TETHER: Connecting to profile ...')
device.ConnectProfile('nap')
except Exception: # raises exception, but still works
pass
net = dbus.Interface(device, 'org.bluez.Network1')
try:
print('debug:BT-TETHER: Connecting to nap network ...')
net.Connect('nap')
return net, True
except dbus.exceptions.DBusException as err:
if err.get_dbus_name() == 'org.bluez.Error.AlreadyConnected':
return net, True
connected = BTNap.prop_get(net, 'Connected')
if not connected:
return None, False
return net, True
class SystemdUnitWrapper:
"""
systemd wrapper
"""
def __init__(self, unit):
self.unit = unit
@staticmethod
def _action_on_unit(action, unit):
process = subprocess.Popen(f"systemctl {action} {unit}", shell=True, stdin=None,
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
process.wait()
if process.returncode > 0:
return False
return True
@staticmethod
def daemon_reload():
"""
Calls systemctl daemon-reload
"""
process = subprocess.Popen("systemctl daemon-reload", shell=True, stdin=None,
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
process.wait()
if process.returncode > 0:
return False
return True
def is_active(self):
"""
Checks if unit is active
"""
return SystemdUnitWrapper._action_on_unit('is-active', self.unit)
def is_enabled(self):
"""
Checks if unit is enabled
"""
return SystemdUnitWrapper._action_on_unit('is-enabled', self.unit)
def is_failed(self):
"""
Checks if unit is failed
"""
return SystemdUnitWrapper._action_on_unit('is-failed', self.unit)
def enable(self):
"""
Enables the unit
"""
return SystemdUnitWrapper._action_on_unit('enable', self.unit)
def disable(self):
"""
Disables the unit
"""
return SystemdUnitWrapper._action_on_unit('disable', self.unit)
def start(self):
"""
Starts the unit
"""
return SystemdUnitWrapper._action_on_unit('start', self.unit)
def stop(self):
"""
Stops the unit
"""
return SystemdUnitWrapper._action_on_unit('stop', self.unit)
def restart(self):
"""
Restarts the unit
"""
return SystemdUnitWrapper._action_on_unit('restart', self.unit)
class IfaceWrapper:
"""
Small wrapper to check and manage ifaces
see: https://github.com/rlisagor/pynetlinux/blob/master/pynetlinux/ifconfig.py
"""
def __init__(self, iface):
self.iface = iface
self.path = f"/sys/class/net/{iface}"
def exists(self):
"""
Checks if iface exists
"""
return os.path.exists(self.path)
def is_up(self):
"""
Checks if iface is ip
"""
return open(f"{self.path}/operstate", 'r').read().rsplit('\n') == 'up'
def set_addr(self, addr):
"""
Set the netmask
"""
process = subprocess.Popen(f"ip addr add {addr} dev {self.iface}", shell=True, stdin=None,
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
process.wait()
if process.returncode == 2 or process.returncode == 0: # 2 = already set
return True
return False
@staticmethod
def set_route(gateway, device):
process = subprocess.Popen(f"ip route replace default via {gateway} dev {device}", shell=True, stdin=None,
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
process.wait()
if process.returncode > 0:
return False
return True
################################################
################################################
OPTIONS = dict()
OPTIONS = {'enabled': True, 'devices': {
'android-phone': {
'enabled': True,
'search_order': 1,
'mac': 'AN:DR:OI:D0:0M:AC',
'ip': '192.168.44.45',
'netmask': 24,
'interval': 1,
'scantime': 10,
'max_tries': 10,
'share_internet': True,
'priority': 1
},
'ios-phone': {
'enabled': False,
'search_order': 2,
'mac': None,
'ip': '172.20.10.6',
'netmask': 24,
'interval': 5,
'scantime': 20,
'max_tries': 0,
'share_internet': False,
'priority': 999
}
}}
device_name = 'android-phone'
device_enabled = True
device_search_order = 1
device_mac = 'AN:DR:OI:D0:0M:AC'
device_ip = '192.168.44.45'
device_netmask = 24
device_interval = 1
device_scantime = 10
device_max_tries = 10
device_share_internet = True
device_priority = 1
device_status = StatusFile(f'/root/.bt-tether-{device_name}')
device_status.update()
device_tries = 0
device_network = None
device_gateway = None
DEVICES = dict()
DEVICES = {'android-phone': {
'enabled': True,
'search_order': 1,
'mac': 'AN:DR:OI:D0:0M:AC',
'ip': '192.168.44.45',
'netmask': 24,
'interval': 1,
'scantime': 10,
'max_tries': 10,
'share_internet': True,
'priority': 1,
'status': StatusFile(f'/root/.bt-tether-{device_name}'),
'tries': 0,
'network': None,
'gateway': None
}
}
################################################
class Device:
def __init__(self, name, share_internet, mac, ip, netmask, interval, gateway=None, priority=10, scantime=15, search_order=0, max_tries=0, **kwargs):
self.name = name
self.status = StatusFile(f'/root/.bt-tether-{name}')
self.status.update()
self.tries = 0
self.network = None
self.max_tries = max_tries
self.search_order = search_order
self.share_internet = share_internet
self.ip = ip
self.netmask = netmask
self.gateway = gateway
self.interval = interval
self.mac = mac
self.scantime = scantime
self.priority = priority
def connected(self):
"""
Checks if device is connected
"""
return self.network and BTNap.prop_get(self.network, 'Connected')
def interface(self):
"""
Returns the interface name or None
"""
if not self.connected():
return None
return BTNap.prop_get(self.network, 'Interface')
#import plugins as plugins
#class BTTether(plugins.Plugin):
class BTTether():
__author__ = '33197631+dadav@users.noreply.github.com'
__version__ = '1.0.0'
__license__ = 'GPL3'
__description__ = 'This makes the display reachable over bluetooth'
def __init__(self):
self.ready = False
self.options = dict()
self.devices = dict()
self.lock = Lock()
#print('!#1!', self.options)
#print('!#1!', self.devices)
def on_loaded(self):
self.options = OPTIONS
self.devices = DEVICES
#print('!#2!', self.options)
# new config
if 'devices' in self.options:
for device, options in self.options['devices'].items():
if 'enabled' in options and options['enabled']:
for device_opt in ['enabled', 'priority', 'scantime', 'search_order',
'max_tries', 'share_internet', 'mac', 'ip',
'netmask', 'interval']:
if device_opt not in options or options[device_opt] is None:
print("BT-TETHER: Please specify the %s for device %s." \
% (device_opt, device))
break
else:
if options['enabled']:
self.devices[device] = Device(name=device, **options)
# legacy
if 'mac' in self.options:
for opt in ['share_internet', 'mac', 'ip', 'netmask', 'interval']:
if opt not in self.options or self.options[opt] is None:
print("BT-TETHER: Please specify the %s in your config.yml." % (opt))
return
self.devices['legacy'] = Device(name='legacy', **self.options)
if not self.devices:
print("BT-TETHER: No valid devices found")
return
# ensure bluetooth is running
bt_unit = SystemdUnitWrapper('bluetooth.service')
if not bt_unit.is_active():
if not bt_unit.start():
print("BT-TETHER: Can't start bluetooth.service")
return
print("info:BT-TETHER: Successfully loaded ...")
self.ready = True
'''
def on_unload(self, ui):
with ui._lock:
ui.remove_element('bluetooth')
def on_ui_setup(self, ui):
with ui._lock:
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 15, 0),
label_font=fonts.Bold, text_font=fonts.Medium))
'''
#def on_ui_update(self, ui):
def on_ui_update(self):
self.devices = DEVICES
#print('!#3!', self.devices)
if not self.ready:
return
#with self.lock:
if self.lock:
devices_to_try = list()
connected_priorities = list()
any_device_connected = False # if this is true, last status on screen should be C
for _, device in self.devices.items():
#print('!#3!', device.max_tries)
if device.connected():
connected_priorities.append(device.priority)
any_device_connected = True
continue
if not device.max_tries or (device.max_tries > device.tries):
if not device.status.newer_then_minutes(device.interval):
#print('!#3!', device.tries)
devices_to_try.append(device)
device.status.update()
device.tries += 1
devices_to_try.append(device)
device.status.update()
device.tries += 1
#sorted_devices = sorted(devices_to_try, key=lambda x: x.search_order)
sorted_devices = sorted(devices_to_try)
#print('!#3!', sorted_devices)
for device in sorted_devices:
#print('!#3!', device.name)
bt = BTNap(device.mac)
try:
print('debug:BT-TETHER: Search %d secs for %s ...' % (device.scantime, device.name))
dev_remote = bt.wait_for_device(timeout=device.scantime)
if dev_remote is None:
print('debug:BT-TETHER: Could not find %s, try again in %d minutes.' % (device.name, device.interval))
print('#UI bluetooth NF')
continue
except Exception as bt_ex:
print(bt_ex)
print('#UI bluetooth NF')
continue
paired = bt.is_paired()
if not paired:
if BTNap.pair(dev_remote):
print('debug:BT-TETHER: Paired with %s.' % (device.name))
else:
print('debug:BT-TETHER: Pairing with %s failed ...' % (device.name))
print('#UI bluetooth PE')
continue
else:
print('debug:BT-TETHER: Already paired.')
print('debug:BT-TETHER: Try to create nap connection with %s ...' % (device.name))
device.network, success = BTNap.nap(dev_remote)
interface = None
if success:
try:
interface = device.interface()
except Exception:
print('debug:BT-TETHER: Could not establish nap connection with %s' % (device.name))
continue
if interface is None:
print('#UI bluetooth BE')
print('debug:BT-TETHER: Could not establish nap connection with %s' % (device.name))
continue
print('debug:BT-TETHER: Created interface (%s)' % (interface))
print('#UI bluetooth C')
any_device_connected = True
device.tries = 0 # reset tries
else:
print('debug:BT-TETHER: Could not establish nap connection with %s' % (device.name))
print('#UI bluetooth NF')
continue
addr = f"{device.ip}/{device.netmask}"
if device.gateway:
gateway = device.gateway
else:
gateway = ".".join(device.ip.split('.')[:-1] + ['1'])
wrapped_interface = IfaceWrapper(interface)
print('debug:BT-TETHER: Add ip to %s' % (interface))
if not wrapped_interface.set_addr(addr):
print('#UI bluetooth AE')
print("debug:BT-TETHER: Could not add ip to %s" % (interface))
continue
if device.share_internet:
if not connected_priorities or device.priority > max(connected_priorities):
print('debug:BT-TETHER: Set default route to %s via %s' % (gateway, interface))
IfaceWrapper.set_route(gateway, interface)
connected_priorities.append(device.priority)
print('debug:BT-TETHER: Change resolv.conf if necessary ...')
with open('/etc/resolv.conf', 'r+') as resolv:
nameserver = resolv.read()
if 'nameserver 9.9.9.9' not in nameserver:
print('debug:BT-TETHER: Added nameserver')
resolv.seek(0)
resolv.write(nameserver + 'nameserver 9.9.9.9\n')
if any_device_connected:
print('#UI bluetooth C')
class CONFIGG():
def Noptions(): # on_loaded(
return OPTIONS
def Ndevices(): # on_ui_update(
return DEVICES
def lock():
#return Lock()
return True
if __name__ == '__main__':
BTTether.on_loaded(CONFIGG)
BTTether.on_ui_update(CONFIGG)
# sys.exit(main())
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment