Skip to content

Instantly share code, notes, and snippets.

@lukaspili
Created October 4, 2017 16:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukaspili/47e652d556e0b4ac00b7c7ff1a55875f to your computer and use it in GitHub Desktop.
Save lukaspili/47e652d556e0b4ac00b7c7ff1a55875f to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from __future__ import absolute_import, print_function, unicode_literals
import dbus
import dbus.mainloop.glib
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
import bluezutils
import math
from time import gmtime, strftime
import time
import datetime
import sys
from threading import Thread
DBUS_BLUEZ_ORG = "org.bluez"
DBUS_BLUEZ_DEVICE_PATH = "org.bluez.Device1"
RETRY_DELAY = 5
devices = {}
lastPairAt = None
lastPairDevicePath = ""
lastConnectionAt = None
lastConnectionDevicePath = ""
reconnectThread = None
class ReconnectThread(Thread):
def __init__(self):
Thread.__init__(self)
self.isRunning = True
def run(self):
while self.isRunning:
self.connect()
time.sleep(2)
def stop(self):
self.isRunning = False
def connect(self):
global devices
global lastConnectionAt
global lastConnectionDevicePath
if lastConnectionDevicePath == "":
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] no lastConnectionDevicePath" + "\n")
sys.stdout.flush()
return
if lastConnectionDevicePath in devices:
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] found in devices " + lastConnectionDevicePath + "\n")
device = devices[lastConnectionDevicePath]
connected = device.get('Connected')
if connected == 1:
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] already connected " + lastConnectionDevicePath + "\n")
sys.stdout.flush()
return
# Anti spam on `connect`
if lastConnectionAt is not None:
if math.fabs((lastConnectionAt - datetime.datetime.now()).total_seconds()) < RETRY_DELAY:
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] Already connecting " + lastConnectionDevicePath + "\n")
sys.stdout.flush()
return
lastConnectionAt = datetime.datetime.now()
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] Connecting: " + lastConnectionDevicePath + "\n")
dev = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, lastConnectionDevicePath), DBUS_BLUEZ_DEVICE_PATH)
try:
dev.Connect()
except dbus.DBusException, error:
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] Connection error: " + error.get_dbus_message() + "\n")
sys.stdout.flush()
def trust(path):
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Trusting: ", path, "\n")
props = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, path), "org.freedesktop.DBus.Properties")
props.Set(DBUS_BLUEZ_DEVICE_PATH, "Trusted", True)
def pair(path):
global lastPairAt
global lastPairDevicePath
# Anti spam on `pair`
if lastPairAt is not None and lastPairDevicePath == path:
if math.fabs((lastPairAt - datetime.datetime.now()).total_seconds()) < RETRY_DELAY:
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Already pairing")
return
# Hack to create a copy, needed for thread
lastPairDevicePath = (path + "x")[:-1]
lastPairAt = datetime.datetime.now()
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Pairing: ", path, "\n")
dev = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, path), DBUS_BLUEZ_DEVICE_PATH)
try:
dev.Pair()
except dbus.DBusException, error:
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Pair error: ", error.get_dbus_message(), "\n")
def connect(path):
global lastConnectionAt
global lastConnectionDevicePath
# Anti spam on `connect`
if lastConnectionAt is not None and lastConnectionDevicePath == path:
if math.fabs((lastConnectionAt - datetime.datetime.now()).total_seconds()) < RETRY_DELAY:
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Already connecting")
return
# Hack to create a copy, needed for thread
lastConnectionDevicePath = (path + "x")[:-1]
lastConnectionAt = datetime.datetime.now()
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Connecting: ", path, "\n")
dev = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, path), DBUS_BLUEZ_DEVICE_PATH)
try:
dev.Connect()
except dbus.DBusException, error:
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Connection error: ", error.get_dbus_message(), "\n")
def print_device(address, properties, path):
print("[ " + address + " ]")
for key in properties.keys():
value = properties[key]
if type(value) is dbus.String:
value = unicode(value).encode('ascii', 'replace')
if (key == "Class"):
print(" %s = 0x%06x" % (key, value))
else:
print(" %s = %s" % (key, value))
print()
def process(address, properties, path):
trusted = properties.get('Trusted')
paired = properties.get('Paired')
connected = properties.get('Connected')
if trusted == 0:
trust(path)
if paired == 0:
pair(path)
elif connected == 0:
connect(path)
def properties_changed(interface, changed, invalidated, path):
global devices
if interface != DBUS_BLUEZ_DEVICE_PATH:
return
oldConnectedValue = 0
if path in devices:
dev = devices[path]
oldConnectedValue = dev.get("Connected")
devices[path] = dict(devices[path].items() + changed.items())
else:
devices[path] = changed
if "Address" in devices[path]:
address = devices[path]["Address"]
else:
return
# Only use VIKTOR- devices
for key in devices[path].keys():
value = devices[path][key]
if (((key == 'Name' or key == 'Alias')) and not value.startswith('VIKTOR-')):
return
print("changed: ", oldConnectedValue, changed)
print_device(address, devices[path], path)
if "Connected" in changed:
newConnectedValue = devices[path].get("Connected")
if (newConnectedValue == 1):
# Device connected
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device connected: ", address, "\n")
# After short disconnection, the device appeared connected and just after disconnected
# By connecting directly it maintains the connection and don't disconnect
process(address, devices[path], path)
elif (oldConnectedValue == 1):
# Device disconnected
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device disconnected: ", address, "\n")
# After short disconnection, the device appeared connected and just after disconnected
# So need to connect again
process(address, devices[path], path)
elif (oldConnectedValue == 0):
# Device detected
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device detected: ", address, "\n")
process(address, devices[path], path)
elif "Connected" in devices[path]:
newConnectedValue = devices[path].get("Connected")
if (newConnectedValue == 0):
# Device detected
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device detected RSSI: ", address, "\n")
process(address, devices[path], path)
def interfaces_added(path, interfaces):
global devices
properties = interfaces[DBUS_BLUEZ_DEVICE_PATH]
if not properties:
return
if path in devices:
dev = devices[path]
devices[path] = dict(devices[path].items() + properties.items())
else:
devices[path] = properties
def property_changed(name, value):
if (name == "Discovering" and not value):
mainloop.quit()
if __name__ == '__main__':
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Starting discovery")
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.mainloop.glib.threads_init()
bus = dbus.SystemBus()
adapter = bluezutils.find_adapter()
bus.add_signal_receiver(interfaces_added,
dbus_interface = "org.freedesktop.DBus.ObjectManager",
signal_name = "InterfacesAdded")
bus.add_signal_receiver(properties_changed,
dbus_interface = "org.freedesktop.DBus.Properties",
signal_name = "PropertiesChanged",
arg0 = DBUS_BLUEZ_DEVICE_PATH,
path_keyword = "path")
bus.add_signal_receiver(property_changed,
dbus_interface = "org.bluez.Adapter1",
signal_name = "PropertyChanged")
om = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, "/"),
"org.freedesktop.DBus.ObjectManager")
objects = om.GetManagedObjects()
for path, interfaces in objects.iteritems():
if DBUS_BLUEZ_DEVICE_PATH in interfaces:
devices[path] = interfaces[DBUS_BLUEZ_DEVICE_PATH]
properties_changed(DBUS_BLUEZ_DEVICE_PATH, devices[path], None, path)
adapter.StartDiscovery()
reconnectThread = ReconnectThread()
reconnectThread.start()
try:
mainloop = GObject.MainLoop()
mainloop.run()
except KeyboardInterrupt:
print("Ctrl-C pressed ...")
reconnectThread.stop()
reconnectThread.join(2)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment