-
-
Save ukBaz/217875c83c2535d22a16ba38fc8f2a91 to your computer and use it in GitHub Desktop.
Bluetooth Classic: Serial Port Profile (echo server)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import dbus | |
import dbus.service | |
import dbus.mainloop.glib | |
from gi.repository import GLib | |
class Profile(dbus.service.Object): | |
fd = -1 | |
@dbus.service.method('org.bluez.Profile1', | |
in_signature='', | |
out_signature='') | |
def Release(self): | |
print('Release') | |
@dbus.service.method('org.bluez.Profile1', | |
in_signature='oha{sv}', | |
out_signature='') | |
def NewConnection(self, path, fd, properties): | |
self.fd = fd.take() | |
print('NewConnection(%s, %d)' % (path, self.fd)) | |
for key in properties.keys(): | |
if key == 'Version' or key == 'Features': | |
print(' %s = 0x%04x' % (key, properties[key])) | |
else: | |
print(' %s = %s' % (key, properties[key])) | |
io_id = GLib.io_add_watch(self.fd, | |
GLib.PRIORITY_DEFAULT, | |
GLib.IO_IN | GLib.IO_PRI, | |
self.io_cb) | |
def io_cb(self, fd, conditions): | |
data = os.read(fd, 1024) | |
print('Callback Data: {0}'.format(data.decode('ascii'))) | |
os.write(fd, bytes(list(reversed(data.rstrip()))) + b'\n') | |
return True | |
@dbus.service.method('org.bluez.Profile1', | |
in_signature='o', | |
out_signature='') | |
def RequestDisconnection(self, path): | |
print('RequestDisconnection(%s)' % (path)) | |
if self.fd > 0: | |
os.close(self.fd) | |
self.fd = -1 | |
def main(): | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
bus = dbus.SystemBus() | |
manager = dbus.Interface(bus.get_object('org.bluez', | |
'/org/bluez'), | |
'org.bluez.ProfileManager1') | |
mainloop = GLib.MainLoop() | |
adapter = dbus.Interface(bus.get_object('org.bluez', | |
'/org/bluez/hci0'), | |
dbus.PROPERTIES_IFACE) | |
discoverable = adapter.Get('org.bluez.Adapter1', 'Discoverable') | |
if not discoverable: | |
print('Making discoverable...') | |
adapter.Set('org.bluez.Adapter1', 'Discoverable', True) | |
profile_path = '/foo/baz/profile' | |
server_uuid = '00001101-0000-1000-8000-00805f9b34fb' | |
opts = { | |
'Version': dbus.UInt16(0x0102), | |
'AutoConnect': dbus.Boolean(True), | |
'Role': 'server', | |
'Name': 'SerialPort', | |
'Service': '00001101-0000-1000-8000-00805f9b34fb', | |
'RequireAuthentication': dbus.Boolean(False), | |
'RequireAuthorization': dbus.Boolean(False), | |
'Channel': dbus.UInt16(1), | |
} | |
print('Starting Serial Port Profile...') | |
profile = Profile(bus, profile_path) | |
manager.RegisterProfile(profile_path, server_uuid, opts) | |
try: | |
mainloop.run() | |
except KeyboardInterrupt: | |
mainloop.quit() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
from gi.repository import Gio, GLib | |
# Introspection data for DBus | |
profile_xml = """ | |
<node> | |
<interface name="org.bluez.Profile1"> | |
<method name="Release"/> | |
<method name="NewConnection"> | |
<arg type="o" name="device" direction="in"/> | |
<arg type="h" name="fd" direction="in"/> | |
<arg type="a{sv}" name="fd_properties" direction="in"/> | |
</method> | |
<method name="RequestDisconnection"> | |
<arg type="o" name="device" direction="in"/> | |
</method> | |
</interface> | |
</node> | |
""" | |
class DbusService: | |
"""Class used to publish a DBus service on to the DBus System Bus""" | |
def __init__(self, introspection_xml, publish_path): | |
self.node_info = Gio.DBusNodeInfo.new_for_xml(introspection_xml).interfaces[0] | |
# start experiment | |
method_outargs = {} | |
method_inargs = {} | |
property_sig = {} | |
for method in self.node_info.methods: | |
method_outargs[method.name] = '(' + ''.join([arg.signature for arg in method.out_args]) + ')' | |
method_inargs[method.name] = tuple(arg.signature for arg in method.in_args) | |
self.method_inargs = method_inargs | |
self.method_outargs = method_outargs | |
self.con = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) | |
self.con.register_object( | |
publish_path, | |
self.node_info, | |
self.handle_method_call, | |
self.prop_getter, | |
self.prop_setter) | |
def handle_method_call(self, | |
connection: Gio.DBusConnection, | |
sender: str, | |
object_path: str, | |
interface_name: str, | |
method_name: str, | |
params: GLib.Variant, | |
invocation: Gio.DBusMethodInvocation | |
): | |
""" | |
This is the top-level function that handles method calls to | |
the server. | |
""" | |
args = list(params.unpack()) | |
# Handle the case where it is a Unix filedescriptor | |
for i, sig in enumerate(self.method_inargs[method_name]): | |
if sig == 'h': | |
msg = invocation.get_message() | |
fd_list = msg.get_unix_fd_list() | |
args[i] = fd_list.get(args[i]) | |
func = self.__getattribute__(method_name) | |
result = func(*args) | |
if result is None: | |
result = () | |
else: | |
result = (result,) | |
outargs = ''.join([_.signature | |
for _ in invocation.get_method_info().out_args]) | |
send_result = GLib.Variant(f'({outargs})', result) | |
invocation.return_value(send_result) | |
def prop_getter(self, | |
connection: Gio.DBusConnection, | |
sender: str, | |
object: str, | |
iface: str, | |
name: str): | |
"""Return requested values on DBus from Python object""" | |
py_value = self.__getattribute__(name) | |
signature = self.node_info.lookup_property(name).signature | |
if py_value: | |
return GLib.Variant(signature, py_value) | |
return None | |
def prop_setter(self, | |
connection: Gio.DBusConnection, | |
sender: str, | |
object: str, | |
iface: str, | |
name: str, | |
value: GLib.Variant): | |
"""Set values on Python object from DBus""" | |
self.__setattr__(name, value.unpack()) | |
return True | |
class Profile(DbusService): | |
def __init__(self, introspection_xml, publish_path): | |
super().__init__(introspection_xml, publish_path) | |
self.fd = -1 | |
def Release(self): | |
print('Release') | |
def NewConnection(self, path, fd, properties): | |
self.fd = fd | |
print(f'NewConnection({path}, {self.fd}, {properties})') | |
for key in properties.keys(): | |
if key == 'Version' or key == 'Features': | |
print(' %s = 0x%04x' % (key, properties[key])) | |
else: | |
print(' %s = %s' % (key, properties[key])) | |
io_id = GLib.io_add_watch(self.fd, | |
GLib.PRIORITY_DEFAULT, | |
GLib.IO_IN | GLib.IO_PRI, | |
self.io_cb) | |
def io_cb(self, fd, conditions): | |
data = os.read(fd, 1024) | |
print('Callback Data: {0}'.format(data.decode('ascii'))) | |
os.write(fd, bytes(list(reversed(data.rstrip()))) + b'\n') | |
return True | |
def RequestDisconnection(self, path): | |
print('RequestDisconnection(%s)' % (path)) | |
if self.fd > 0: | |
os.close(self.fd) | |
self.fd = -1 | |
def main(): | |
obj_mngr = Gio.DBusObjectManagerClient.new_for_bus_sync( | |
bus_type=Gio.BusType.SYSTEM, | |
flags=Gio.DBusObjectManagerClientFlags.NONE, | |
name='org.bluez', | |
object_path='/', | |
get_proxy_type_func=None, | |
get_proxy_type_user_data=None, | |
cancellable=None, | |
) | |
manager = obj_mngr.get_object('/org/bluez').get_interface('org.bluez.ProfileManager1') | |
adapter = obj_mngr.get_object('/org/bluez/hci0').get_interface('org.freedesktop.DBus.Properties') | |
mainloop = GLib.MainLoop() | |
discoverable = adapter.Get('(ss)', 'org.bluez.Adapter1', 'Discoverable') | |
if not discoverable: | |
print('Making discoverable...') | |
adapter.Set('(ssv)', 'org.bluez.Adapter1', | |
'Discoverable', GLib.Variant.new_boolean(True)) | |
profile_path = '/org/bluez/test/profile' | |
server_uuid = '00001101-0000-1000-8000-00805f9b34fb' | |
opts = { | |
# https://github.com/bluez/bluez/blob/a337097749445670c416455012f3c160c668681d/src/profile.c#L2043-L2050 | |
'Version': GLib.Variant.new_uint16(0x0102), | |
'AutoConnect': GLib.Variant.new_boolean(True), | |
'Role': GLib.Variant.new_string('server'), | |
'Name': GLib.Variant.new_string('SerialPort'), | |
'Service': GLib.Variant.new_string('00001101-0000-1000-8000-00805f9b34fb'), | |
'RequireAuthentication': GLib.Variant.new_boolean(False), | |
'RequireAuthorization': GLib.Variant.new_boolean(False), | |
'Channel': GLib.Variant.new_uint16(1), | |
} | |
print('Starting Serial Port Profile...') | |
profile = Profile(profile_xml, profile_path) | |
manager.RegisterProfile('(osa{sv})', profile_path, server_uuid, opts) | |
try: | |
mainloop.run() | |
except KeyboardInterrupt: | |
mainloop.quit() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
server_address = 'FC:F8:AE:8F:0C:A4' | |
server_port = 1 | |
backlog = 1 | |
size = 1024 | |
while True: | |
with socket.socket(socket.AF_BLUETOOTH, | |
socket.SOCK_STREAM, | |
socket.BTPROTO_RFCOMM) as s: | |
s.bind((server_address, server_port)) | |
s.listen(backlog) | |
print('Waiting for connection...') | |
client, address = s.accept() | |
print(f'Connection from {address}') | |
while True: | |
try: | |
data = client.recv(size) | |
if data: | |
print(data) | |
client.send(bytes(list(reversed(data.rstrip()))) + b'\n') | |
except ConnectionResetError: | |
print('Client disconnected') | |
break |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
server_address = "xx:xx:xx:xx:xx:xx" | |
server_port = 1 | |
with socket.socket(socket.AF_BLUETOOTH, | |
socket.SOCK_STREAM, | |
socket.BTPROTO_RFCOMM) as c: | |
c.connect((server_address, server_port)) | |
c.send(b"desserts") | |
print(c.recv(1024).decode()) | |
In a separate terminal invoking the server with:
$ ./spp_echo_server.py
It is a good idea to invoke this before doing the pairing above so that the SPP gets authorized at the time of pairing
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Pairing
The first time you use a phone and linux box you need to pair them.
On the Linux command line you can set pairable and discoverable mode. You can then search for Linux box from the phone:
when you start the server this happens