Last active
June 9, 2024 17:32
-
-
Save floe/47fa5b33da71f1a637b60a53a24cd549 to your computer and use it in GitHub Desktop.
Accept all Bluetooth file transfers on Linux
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# SPDX-License-Identifier: LGPL-2.1-or-later | |
# original source: https://github.com/bluez/bluez/blob/master/test/simple-obex-agent | |
import os | |
import sys | |
import dbus | |
import dbus.service | |
import dbus.mainloop.glib | |
from gi.repository import GLib | |
BUS_NAME = 'org.bluez.obex' | |
PATH = '/org/bluez/obex' | |
AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1' | |
AGENT_INTERFACE = 'org.bluez.obex.Agent1' | |
TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1' | |
bus = None | |
transfers = {} | |
class Transfer: | |
def __init__(self, watch, path, file, size=0): | |
self.watch = watch | |
self.path = path | |
self.file = file | |
self.size = size | |
def progress(self,amount): | |
percent = (amount / self.size) * 100 | |
print("Progress: "+str(round(percent))+"%\r",end="") | |
def finish(self): | |
print("Cleaning up for "+self.file) | |
self.watch.remove() | |
# FIXME: obexd path is hardcoded, could be extracted from Session interface | |
os.rename(os.path.expanduser("~/.cache/obexd/")+self.file, | |
GLib.get_user_special_dir(GLib.UserDirectory.DIRECTORY_DOWNLOAD)+"/"+self.file) | |
transfers.pop(self.path) | |
def signal_handler(name,properties,ignore,path=""): | |
if "Size" in properties: | |
transfers[path].size = properties["Size"] | |
if "Status" in properties: | |
if properties["Status"] == "complete": | |
transfers[path].finish() | |
if "Transferred" in properties: | |
transfers[path].progress(properties["Transferred"]) | |
class Agent(dbus.service.Object): | |
def __init__(self, conn=None, obj_path=None): | |
dbus.service.Object.__init__(self, conn, obj_path) | |
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") | |
def AuthorizePush(self, path): | |
transfer = dbus.Interface(bus.get_object(BUS_NAME, path), 'org.freedesktop.DBus.Properties') | |
properties = transfer.GetAll(TRANSFER_INTERFACE) | |
filename = properties["Name"] | |
print("Incoming transfer: "+filename) | |
watch = bus.add_signal_receiver(signal_handler,path=path,path_keyword="path") | |
transfers[path] = Transfer(watch,path,filename) | |
return filename | |
@dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") | |
def Cancel(self): | |
print("Authorization Canceled") | |
if __name__ == '__main__': | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
bus = dbus.SessionBus() | |
manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), AGENT_MANAGER_INTERFACE) | |
path = "/test/agent" | |
agent = Agent(bus, path) | |
mainloop = GLib.MainLoop() | |
manager.RegisterAgent(path) | |
print("Agent registered") | |
mainloop.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment