Skip to content

Instantly share code, notes, and snippets.

@josemarcosrf
Last active February 14, 2024 18:06
Show Gist options
  • Save josemarcosrf/e181769c51b99952b77bf0eb8ab9c98e to your computer and use it in GitHub Desktop.
Save josemarcosrf/e181769c51b99952b77bf0eb8ab9c98e to your computer and use it in GitHub Desktop.
python BLE server & client using pybluez
import click
import json
import logging
import sys
import bluetooth as bt
import coloredlogs
from typing import *
logger = logging.getLogger(__name__)
coloredlogs.install(logger=logger, level=logging.DEBUG)
class BLEServer:
def __init__(
self, uuid: str, server_name: str, port: int = 1,
):
self.uuid = uuid
self.server_name = server_name
# Advertise the server
self.port = port
self.server_sock = self._advertise()
# mock state
self.state = [0, 0, 0, 0]
def _advertise(self):
logger.debug(
f"Advertising with: server name: {self.server_name} "
f"| port: {self.port} "
f"| uuid: {self.uuid}"
)
server_sock = bt.BluetoothSocket(bt.RFCOMM)
server_sock.bind(("", bt.PORT_ANY))
server_sock.listen(self.port)
bt.advertise_service(
server_sock,
self.server_name,
service_id=self.uuid,
service_classes=[self.uuid, bt.SERIAL_PORT_CLASS],
profiles=[bt.SERIAL_PORT_PROFILE],
)
return server_sock
def _accept_connection(self) -> bt.BluetoothSocket:
# Accept a connection
client_sock, client_info = self.server_sock.accept()
logger.info(f"Accepted connection from {client_info}")
return client_sock
def process_request(self, data: bytes):
data = json.loads(data)
logger.info(f"Should process: '{data}'")
if data.get("cmd") == "/read":
return self.state
else:
self.state[data["channels"][0] - 1] = data["mode"]
return json.dumps({"ok": True, "state": self.state, "scheduled": [], })
def run(self):
# Wait for an incoming connection
_ = self.server_sock.getsockname()[self.port]
client_sock = self._accept_connection()
while True:
try:
data = client_sock.recv(1024)
ret = self.process_request(data)
logger.debug(f"request process returned: {ret}")
client_sock.send(json.dumps(ret))
except bt.BluetoothError as be:
if be.errno == 104:
logger.warning(f"Connection reset by peer...")
client_sock.close()
# Accept a new connection
client_sock = self._accept_connection()
else:
logger.debug(be.errno)
logger.error(f"Something wrong with bluetooth: {be}")
return
except KeyboardInterrupt:
logger.warning("\nDisconnected")
return
except Exception as e:
logger.error(f"BT Server Unknown error: {e}")
return
@click.command()
@click.option("--uuid", default="616d3aa1-689e-4e71-8fed-09f3c7c4ad91")
@click.option("--server-name", default="RPI-BT-Server")
@click.option("--port", default=1)
def main(uuid: str, server_name: str, port: int):
try:
ble = BLEServer(uuid, server_name, port)
ble.run()
except bt.btcommon.BluetoothError as be:
logger.error(f"Bluetooth Error: {be}")
logger.error(
"Try running like: "
"'sudo hciconfig hci0 piscan && sudo .venv/bin/python ble_server.py'"
)
except Exception as e:
logger.error(f"Unknown error: {e}")
if __name__ == "__main__":
"""
For an example for RFCOMM server from pybluez:
# https://github.com/pybluez/pybluez/blob/master/examples/simple/rfcomm-server.py
"""
main()
import click
import json
import logging
import sys
import bluetooth
import coloredlogs
logger = logging.getLogger(__name__)
coloredlogs.install(logger=logger, level=logging.DEBUG)
@click.command()
@click.option("--uuid", default="616d3aa1-689e-4e71-8fed-09f3c7c4ad91")
@click.option("--addr")
def main(uuid: str, addr: str = None):
if addr:
logger.info(f"Searching for BT Server with address={addr}...")
else:
logger.info("Searching all nearby bluetooth devices for the BT Server")
# search for the BT Server service
service_matches = bluetooth.find_service(uuid=uuid, address=addr)
if len(service_matches) == 0:
logger.info("Couldn't find the SampleServer service.")
sys.exit(0)
first_match = service_matches[0]
port = first_match["port"]
name = first_match["name"]
host = first_match["host"]
logger.info(f"Connecting to {name} @ {host}:{port}")
# Create the client socket
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.connect((host, port))
logger.info("Connected!")
while True:
channels = input("Input channels\t").strip()
channels = list(map(int, channels.split(" ")))
mode = input("Input ON (1) / OFF (0)\t")
sock.send(
json.dumps({"cmd": "/switch", "channels": channels, "mode": int(mode)})
)
res = sock.recv(1024)
print(res)
print("-" * 40)
sock.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment