Created
October 4, 2017 16:36
-
-
Save lukaspili/47e652d556e0b4ac00b7c7ff1a55875f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from __future__ import absolute_import, print_function, unicode_literals | |
import dbus | |
import dbus.mainloop.glib | |
try: | |
from gi.repository import GObject | |
except ImportError: | |
import gobject as GObject | |
import bluezutils | |
import math | |
from time import gmtime, strftime | |
import time | |
import datetime | |
import sys | |
from threading import Thread | |
DBUS_BLUEZ_ORG = "org.bluez" | |
DBUS_BLUEZ_DEVICE_PATH = "org.bluez.Device1" | |
RETRY_DELAY = 5 | |
devices = {} | |
lastPairAt = None | |
lastPairDevicePath = "" | |
lastConnectionAt = None | |
lastConnectionDevicePath = "" | |
reconnectThread = None | |
class ReconnectThread(Thread): | |
def __init__(self): | |
Thread.__init__(self) | |
self.isRunning = True | |
def run(self): | |
while self.isRunning: | |
self.connect() | |
time.sleep(2) | |
def stop(self): | |
self.isRunning = False | |
def connect(self): | |
global devices | |
global lastConnectionAt | |
global lastConnectionDevicePath | |
if lastConnectionDevicePath == "": | |
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] no lastConnectionDevicePath" + "\n") | |
sys.stdout.flush() | |
return | |
if lastConnectionDevicePath in devices: | |
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] found in devices " + lastConnectionDevicePath + "\n") | |
device = devices[lastConnectionDevicePath] | |
connected = device.get('Connected') | |
if connected == 1: | |
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] already connected " + lastConnectionDevicePath + "\n") | |
sys.stdout.flush() | |
return | |
# Anti spam on `connect` | |
if lastConnectionAt is not None: | |
if math.fabs((lastConnectionAt - datetime.datetime.now()).total_seconds()) < RETRY_DELAY: | |
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] Already connecting " + lastConnectionDevicePath + "\n") | |
sys.stdout.flush() | |
return | |
lastConnectionAt = datetime.datetime.now() | |
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] Connecting: " + lastConnectionDevicePath + "\n") | |
dev = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, lastConnectionDevicePath), DBUS_BLUEZ_DEVICE_PATH) | |
try: | |
dev.Connect() | |
except dbus.DBusException, error: | |
sys.stdout.write(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [Thread] Connection error: " + error.get_dbus_message() + "\n") | |
sys.stdout.flush() | |
def trust(path): | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Trusting: ", path, "\n") | |
props = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, path), "org.freedesktop.DBus.Properties") | |
props.Set(DBUS_BLUEZ_DEVICE_PATH, "Trusted", True) | |
def pair(path): | |
global lastPairAt | |
global lastPairDevicePath | |
# Anti spam on `pair` | |
if lastPairAt is not None and lastPairDevicePath == path: | |
if math.fabs((lastPairAt - datetime.datetime.now()).total_seconds()) < RETRY_DELAY: | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Already pairing") | |
return | |
# Hack to create a copy, needed for thread | |
lastPairDevicePath = (path + "x")[:-1] | |
lastPairAt = datetime.datetime.now() | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Pairing: ", path, "\n") | |
dev = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, path), DBUS_BLUEZ_DEVICE_PATH) | |
try: | |
dev.Pair() | |
except dbus.DBusException, error: | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Pair error: ", error.get_dbus_message(), "\n") | |
def connect(path): | |
global lastConnectionAt | |
global lastConnectionDevicePath | |
# Anti spam on `connect` | |
if lastConnectionAt is not None and lastConnectionDevicePath == path: | |
if math.fabs((lastConnectionAt - datetime.datetime.now()).total_seconds()) < RETRY_DELAY: | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Already connecting") | |
return | |
# Hack to create a copy, needed for thread | |
lastConnectionDevicePath = (path + "x")[:-1] | |
lastConnectionAt = datetime.datetime.now() | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Connecting: ", path, "\n") | |
dev = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, path), DBUS_BLUEZ_DEVICE_PATH) | |
try: | |
dev.Connect() | |
except dbus.DBusException, error: | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Connection error: ", error.get_dbus_message(), "\n") | |
def print_device(address, properties, path): | |
print("[ " + address + " ]") | |
for key in properties.keys(): | |
value = properties[key] | |
if type(value) is dbus.String: | |
value = unicode(value).encode('ascii', 'replace') | |
if (key == "Class"): | |
print(" %s = 0x%06x" % (key, value)) | |
else: | |
print(" %s = %s" % (key, value)) | |
print() | |
def process(address, properties, path): | |
trusted = properties.get('Trusted') | |
paired = properties.get('Paired') | |
connected = properties.get('Connected') | |
if trusted == 0: | |
trust(path) | |
if paired == 0: | |
pair(path) | |
elif connected == 0: | |
connect(path) | |
def properties_changed(interface, changed, invalidated, path): | |
global devices | |
if interface != DBUS_BLUEZ_DEVICE_PATH: | |
return | |
oldConnectedValue = 0 | |
if path in devices: | |
dev = devices[path] | |
oldConnectedValue = dev.get("Connected") | |
devices[path] = dict(devices[path].items() + changed.items()) | |
else: | |
devices[path] = changed | |
if "Address" in devices[path]: | |
address = devices[path]["Address"] | |
else: | |
return | |
# Only use VIKTOR- devices | |
for key in devices[path].keys(): | |
value = devices[path][key] | |
if (((key == 'Name' or key == 'Alias')) and not value.startswith('VIKTOR-')): | |
return | |
print("changed: ", oldConnectedValue, changed) | |
print_device(address, devices[path], path) | |
if "Connected" in changed: | |
newConnectedValue = devices[path].get("Connected") | |
if (newConnectedValue == 1): | |
# Device connected | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device connected: ", address, "\n") | |
# After short disconnection, the device appeared connected and just after disconnected | |
# By connecting directly it maintains the connection and don't disconnect | |
process(address, devices[path], path) | |
elif (oldConnectedValue == 1): | |
# Device disconnected | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device disconnected: ", address, "\n") | |
# After short disconnection, the device appeared connected and just after disconnected | |
# So need to connect again | |
process(address, devices[path], path) | |
elif (oldConnectedValue == 0): | |
# Device detected | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device detected: ", address, "\n") | |
process(address, devices[path], path) | |
elif "Connected" in devices[path]: | |
newConnectedValue = devices[path].get("Connected") | |
if (newConnectedValue == 0): | |
# Device detected | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Device detected RSSI: ", address, "\n") | |
process(address, devices[path], path) | |
def interfaces_added(path, interfaces): | |
global devices | |
properties = interfaces[DBUS_BLUEZ_DEVICE_PATH] | |
if not properties: | |
return | |
if path in devices: | |
dev = devices[path] | |
devices[path] = dict(devices[path].items() + properties.items()) | |
else: | |
devices[path] = properties | |
def property_changed(name, value): | |
if (name == "Discovering" and not value): | |
mainloop.quit() | |
if __name__ == '__main__': | |
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()), "Starting discovery") | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
dbus.mainloop.glib.threads_init() | |
bus = dbus.SystemBus() | |
adapter = bluezutils.find_adapter() | |
bus.add_signal_receiver(interfaces_added, | |
dbus_interface = "org.freedesktop.DBus.ObjectManager", | |
signal_name = "InterfacesAdded") | |
bus.add_signal_receiver(properties_changed, | |
dbus_interface = "org.freedesktop.DBus.Properties", | |
signal_name = "PropertiesChanged", | |
arg0 = DBUS_BLUEZ_DEVICE_PATH, | |
path_keyword = "path") | |
bus.add_signal_receiver(property_changed, | |
dbus_interface = "org.bluez.Adapter1", | |
signal_name = "PropertyChanged") | |
om = dbus.Interface(bus.get_object(DBUS_BLUEZ_ORG, "/"), | |
"org.freedesktop.DBus.ObjectManager") | |
objects = om.GetManagedObjects() | |
for path, interfaces in objects.iteritems(): | |
if DBUS_BLUEZ_DEVICE_PATH in interfaces: | |
devices[path] = interfaces[DBUS_BLUEZ_DEVICE_PATH] | |
properties_changed(DBUS_BLUEZ_DEVICE_PATH, devices[path], None, path) | |
adapter.StartDiscovery() | |
reconnectThread = ReconnectThread() | |
reconnectThread.start() | |
try: | |
mainloop = GObject.MainLoop() | |
mainloop.run() | |
except KeyboardInterrupt: | |
print("Ctrl-C pressed ...") | |
reconnectThread.stop() | |
reconnectThread.join(2) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment