Skip to content

Instantly share code, notes, and snippets.

@floe
Last active June 9, 2024 17:32
Show Gist options
  • Save floe/47fa5b33da71f1a637b60a53a24cd549 to your computer and use it in GitHub Desktop.
Save floe/47fa5b33da71f1a637b60a53a24cd549 to your computer and use it in GitHub Desktop.
Accept all Bluetooth file transfers on Linux
#!/usr/bin/python3
# SPDX-License-Identifier: LGPL-2.1-or-later
# original source: https://github.com/bluez/bluez/blob/master/test/simple-obex-agent
import os
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
BUS_NAME = 'org.bluez.obex'
PATH = '/org/bluez/obex'
AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1'
AGENT_INTERFACE = 'org.bluez.obex.Agent1'
TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
bus = None
transfers = {}
class Transfer:
def __init__(self, watch, path, file, size=0):
self.watch = watch
self.path = path
self.file = file
self.size = size
def progress(self,amount):
percent = (amount / self.size) * 100
print("Progress: "+str(round(percent))+"%\r",end="")
def finish(self):
print("Cleaning up for "+self.file)
self.watch.remove()
# FIXME: obexd path is hardcoded, could be extracted from Session interface
os.rename(os.path.expanduser("~/.cache/obexd/")+self.file,
GLib.get_user_special_dir(GLib.UserDirectory.DIRECTORY_DOWNLOAD)+"/"+self.file)
transfers.pop(self.path)
def signal_handler(name,properties,ignore,path=""):
if "Size" in properties:
transfers[path].size = properties["Size"]
if "Status" in properties:
if properties["Status"] == "complete":
transfers[path].finish()
if "Transferred" in properties:
transfers[path].progress(properties["Transferred"])
class Agent(dbus.service.Object):
def __init__(self, conn=None, obj_path=None):
dbus.service.Object.__init__(self, conn, obj_path)
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s")
def AuthorizePush(self, path):
transfer = dbus.Interface(bus.get_object(BUS_NAME, path), 'org.freedesktop.DBus.Properties')
properties = transfer.GetAll(TRANSFER_INTERFACE)
filename = properties["Name"]
print("Incoming transfer: "+filename)
watch = bus.add_signal_receiver(signal_handler,path=path,path_keyword="path")
transfers[path] = Transfer(watch,path,filename)
return filename
@dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="")
def Cancel(self):
print("Authorization Canceled")
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), AGENT_MANAGER_INTERFACE)
path = "/test/agent"
agent = Agent(bus, path)
mainloop = GLib.MainLoop()
manager.RegisterAgent(path)
print("Agent registered")
mainloop.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment