Skip to content

Instantly share code, notes, and snippets.

@ukBaz
Last active January 2, 2022 20:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ukBaz/217875c83c2535d22a16ba38fc8f2a91 to your computer and use it in GitHub Desktop.
Save ukBaz/217875c83c2535d22a16ba38fc8f2a91 to your computer and use it in GitHub Desktop.
Bluetooth Classic: Serial Port Profile (echo server)
#!/usr/bin/python3
import os
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
class Profile(dbus.service.Object):
fd = -1
@dbus.service.method('org.bluez.Profile1',
in_signature='',
out_signature='')
def Release(self):
print('Release')
@dbus.service.method('org.bluez.Profile1',
in_signature='oha{sv}',
out_signature='')
def NewConnection(self, path, fd, properties):
self.fd = fd.take()
print('NewConnection(%s, %d)' % (path, self.fd))
for key in properties.keys():
if key == 'Version' or key == 'Features':
print(' %s = 0x%04x' % (key, properties[key]))
else:
print(' %s = %s' % (key, properties[key]))
io_id = GLib.io_add_watch(self.fd,
GLib.PRIORITY_DEFAULT,
GLib.IO_IN | GLib.IO_PRI,
self.io_cb)
def io_cb(self, fd, conditions):
data = os.read(fd, 1024)
print('Callback Data: {0}'.format(data.decode('ascii')))
os.write(fd, bytes(list(reversed(data.rstrip()))) + b'\n')
return True
@dbus.service.method('org.bluez.Profile1',
in_signature='o',
out_signature='')
def RequestDisconnection(self, path):
print('RequestDisconnection(%s)' % (path))
if self.fd > 0:
os.close(self.fd)
self.fd = -1
def main():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('org.bluez',
'/org/bluez'),
'org.bluez.ProfileManager1')
mainloop = GLib.MainLoop()
adapter = dbus.Interface(bus.get_object('org.bluez',
'/org/bluez/hci0'),
dbus.PROPERTIES_IFACE)
discoverable = adapter.Get('org.bluez.Adapter1', 'Discoverable')
if not discoverable:
print('Making discoverable...')
adapter.Set('org.bluez.Adapter1', 'Discoverable', True)
profile_path = '/foo/baz/profile'
server_uuid = '00001101-0000-1000-8000-00805f9b34fb'
opts = {
'Version': dbus.UInt16(0x0102),
'AutoConnect': dbus.Boolean(True),
'Role': 'server',
'Name': 'SerialPort',
'Service': '00001101-0000-1000-8000-00805f9b34fb',
'RequireAuthentication': dbus.Boolean(False),
'RequireAuthorization': dbus.Boolean(False),
'Channel': dbus.UInt16(1),
}
print('Starting Serial Port Profile...')
profile = Profile(bus, profile_path)
manager.RegisterProfile(profile_path, server_uuid, opts)
try:
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
if __name__ == '__main__':
main()
#!/usr/bin/python3
import os
from gi.repository import Gio, GLib
# Introspection data for DBus
profile_xml = """
<node>
<interface name="org.bluez.Profile1">
<method name="Release"/>
<method name="NewConnection">
<arg type="o" name="device" direction="in"/>
<arg type="h" name="fd" direction="in"/>
<arg type="a{sv}" name="fd_properties" direction="in"/>
</method>
<method name="RequestDisconnection">
<arg type="o" name="device" direction="in"/>
</method>
</interface>
</node>
"""
class DbusService:
"""Class used to publish a DBus service on to the DBus System Bus"""
def __init__(self, introspection_xml, publish_path):
self.node_info = Gio.DBusNodeInfo.new_for_xml(introspection_xml).interfaces[0]
# start experiment
method_outargs = {}
method_inargs = {}
property_sig = {}
for method in self.node_info.methods:
method_outargs[method.name] = '(' + ''.join([arg.signature for arg in method.out_args]) + ')'
method_inargs[method.name] = tuple(arg.signature for arg in method.in_args)
self.method_inargs = method_inargs
self.method_outargs = method_outargs
self.con = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
self.con.register_object(
publish_path,
self.node_info,
self.handle_method_call,
self.prop_getter,
self.prop_setter)
def handle_method_call(self,
connection: Gio.DBusConnection,
sender: str,
object_path: str,
interface_name: str,
method_name: str,
params: GLib.Variant,
invocation: Gio.DBusMethodInvocation
):
"""
This is the top-level function that handles method calls to
the server.
"""
args = list(params.unpack())
# Handle the case where it is a Unix filedescriptor
for i, sig in enumerate(self.method_inargs[method_name]):
if sig == 'h':
msg = invocation.get_message()
fd_list = msg.get_unix_fd_list()
args[i] = fd_list.get(args[i])
func = self.__getattribute__(method_name)
result = func(*args)
if result is None:
result = ()
else:
result = (result,)
outargs = ''.join([_.signature
for _ in invocation.get_method_info().out_args])
send_result = GLib.Variant(f'({outargs})', result)
invocation.return_value(send_result)
def prop_getter(self,
connection: Gio.DBusConnection,
sender: str,
object: str,
iface: str,
name: str):
"""Return requested values on DBus from Python object"""
py_value = self.__getattribute__(name)
signature = self.node_info.lookup_property(name).signature
if py_value:
return GLib.Variant(signature, py_value)
return None
def prop_setter(self,
connection: Gio.DBusConnection,
sender: str,
object: str,
iface: str,
name: str,
value: GLib.Variant):
"""Set values on Python object from DBus"""
self.__setattr__(name, value.unpack())
return True
class Profile(DbusService):
def __init__(self, introspection_xml, publish_path):
super().__init__(introspection_xml, publish_path)
self.fd = -1
def Release(self):
print('Release')
def NewConnection(self, path, fd, properties):
self.fd = fd
print(f'NewConnection({path}, {self.fd}, {properties})')
for key in properties.keys():
if key == 'Version' or key == 'Features':
print(' %s = 0x%04x' % (key, properties[key]))
else:
print(' %s = %s' % (key, properties[key]))
io_id = GLib.io_add_watch(self.fd,
GLib.PRIORITY_DEFAULT,
GLib.IO_IN | GLib.IO_PRI,
self.io_cb)
def io_cb(self, fd, conditions):
data = os.read(fd, 1024)
print('Callback Data: {0}'.format(data.decode('ascii')))
os.write(fd, bytes(list(reversed(data.rstrip()))) + b'\n')
return True
def RequestDisconnection(self, path):
print('RequestDisconnection(%s)' % (path))
if self.fd > 0:
os.close(self.fd)
self.fd = -1
def main():
obj_mngr = Gio.DBusObjectManagerClient.new_for_bus_sync(
bus_type=Gio.BusType.SYSTEM,
flags=Gio.DBusObjectManagerClientFlags.NONE,
name='org.bluez',
object_path='/',
get_proxy_type_func=None,
get_proxy_type_user_data=None,
cancellable=None,
)
manager = obj_mngr.get_object('/org/bluez').get_interface('org.bluez.ProfileManager1')
adapter = obj_mngr.get_object('/org/bluez/hci0').get_interface('org.freedesktop.DBus.Properties')
mainloop = GLib.MainLoop()
discoverable = adapter.Get('(ss)', 'org.bluez.Adapter1', 'Discoverable')
if not discoverable:
print('Making discoverable...')
adapter.Set('(ssv)', 'org.bluez.Adapter1',
'Discoverable', GLib.Variant.new_boolean(True))
profile_path = '/org/bluez/test/profile'
server_uuid = '00001101-0000-1000-8000-00805f9b34fb'
opts = {
# https://github.com/bluez/bluez/blob/a337097749445670c416455012f3c160c668681d/src/profile.c#L2043-L2050
'Version': GLib.Variant.new_uint16(0x0102),
'AutoConnect': GLib.Variant.new_boolean(True),
'Role': GLib.Variant.new_string('server'),
'Name': GLib.Variant.new_string('SerialPort'),
'Service': GLib.Variant.new_string('00001101-0000-1000-8000-00805f9b34fb'),
'RequireAuthentication': GLib.Variant.new_boolean(False),
'RequireAuthorization': GLib.Variant.new_boolean(False),
'Channel': GLib.Variant.new_uint16(1),
}
print('Starting Serial Port Profile...')
profile = Profile(profile_xml, profile_path)
manager.RegisterProfile('(osa{sv})', profile_path, server_uuid, opts)
try:
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
if __name__ == '__main__':
main()
import socket
server_address = 'FC:F8:AE:8F:0C:A4'
server_port = 1
backlog = 1
size = 1024
while True:
with socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM) as s:
s.bind((server_address, server_port))
s.listen(backlog)
print('Waiting for connection...')
client, address = s.accept()
print(f'Connection from {address}')
while True:
try:
data = client.recv(size)
if data:
print(data)
client.send(bytes(list(reversed(data.rstrip()))) + b'\n')
except ConnectionResetError:
print('Client disconnected')
break
import socket
server_address = "xx:xx:xx:xx:xx:xx"
server_port = 1
with socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM) as c:
c.connect((server_address, server_port))
c.send(b"desserts")
print(c.recv(1024).decode())
@ukBaz
Copy link
Author

ukBaz commented Apr 26, 2017

Pairing

The first time you use a phone and linux box you need to pair them.

On the Linux command line you can set pairable and discoverable mode. You can then search for Linux box from the phone:

$ bluetoothctl 
[bluetooth]# power on
Changing power on succeeded
[bluetooth]# agent KeyboardDisplay
Agent registered
[bluetooth]# default-agent 
Default agent request successful
[bluetooth]# discoverable on
Changing discoverable on succeeded
[CHG] Controller B8:27:EB:22:57:E0 Discoverable: yes
[bluetooth]# pairable on
Changing pairable on succeeded
[NEW] Device 64:BC:0C:F6:22:F8 Nexus 5X
Request confirmation
[agent] Confirm passkey 910035 (yes/no): yes

when you start the server this happens

Authorize service
[agent] Authorize service 00001101-0000-1000-8000-00805f9b34fb (yes/no): yes
[Nexus 5X]# trust
[CHG] Device 64:BC:0C:F6:22:F8 Trusted: yes
Changing  trust succeeded

@ukBaz
Copy link
Author

ukBaz commented Apr 26, 2017

In a separate terminal invoking the server with:

$ ./spp_echo_server.py 

It is a good idea to invoke this before doing the pairing above so that the SPP gets authorized at the time of pairing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment