Skip to content

Instantly share code, notes, and snippets.

@kzyapkov
Last active November 3, 2022 10:23
Show Gist options
  • Save kzyapkov/e4441b0359f0ce52cd3666a76f90b165 to your computer and use it in GitHub Desktop.
Save kzyapkov/e4441b0359f0ce52cd3666a76f90b165 to your computer and use it in GitHub Desktop.
Silabs Gecko BLE OTA with Python
#!/usr/bin/env python
# Copyright (c) 2022 Kiril Zyapkov
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
import argparse
import logging
# pip install bleak
from bleak import BleakScanner, BleakClient
LOG = logging.getLogger()
OTA_SVC_UUID = "1d14d6ee-fd63-4fa1-bfa4-8f47b42119f0"
OTA_SVC_CHAR_CONTROL = "f7bf3564-fb6d-4e53-88a4-5e37e0326063"
OTA_SVC_CHAR_DATA = "984227f3-34fc-4045-a5d0-2c581f81a153"
OTA_SVC_CHAR_STACK_VER = "4f4a2368-8cca-451e-bfff-cf0e2ee23e9f"
OTA_SVC_CHAR_OTA_VER = "4cc07bcf-0868-4b32-9dad-ba4cc41e5316"
OTA_SVC_CHAR_BL_VER = "25f05c0a-e917-46e9-b2a5-aa2be1245afe"
OTA_SVC_CHAR_APP_VER = "0d77cc11-4ac1-49f2-bfa9-cd96ac7a92f8"
def get_parser():
parser = argparse.ArgumentParser()
parser.add_argument("-v", type=int, default=logging.INFO)
parser.add_argument("gbl", help=".gbl file to deploy on device")
parser.add_argument("addr", help="Address or device name to update")
return parser
async def dump(client):
LOG.info("Client %s %s", client)
services = await client.get_services()
for s in services:
LOG.info(" SVC %s", s)
for c in s.characteristics:
LOG.info(" CHAR %s", c)
return services
def find_ota_service_and_chars(services):
svc = None
ctrl_char = None
data_char = None
# BlueZ does aggressive caching by default, we see multiple instances
# of the OTA service after reconnect, so this wizardry is necessary.
# One does not simply lookup a service by UUID!
for svc in filter(lambda s: s.uuid == OTA_SVC_UUID, services):
# theoretically, the same applies for the data characteristic,
# but it doesn't occur in practice
ctrl_char = svc.get_characteristic(OTA_SVC_CHAR_CONTROL)
data_char = svc.get_characteristic(OTA_SVC_CHAR_DATA)
if data_char is not None:
break
return (svc, ctrl_char, data_char)
async def main():
parser = get_parser()
args = parser.parse_args()
logging.basicConfig(
level=args.v,
format="[%(asctime)s %(levelno)d] %(message)s",
datefmt="%Y/%m/%d %H:%M:%S",
)
device = await BleakScanner().find_device_by_address(args.addr)
if not device:
LOG.warning("Device %s not found", args.addr)
return -1
async with BleakClient(device) as client:
services = await client.get_services()
svc, ctrl_char, data_char = find_ota_service_and_chars(services)
if svc is None:
LOG.error("%s doesn't expose the OTA service", device)
return -2
if data_char is None:
LOG.info("Entering DFU mode ...")
await client.write_gatt_char(ctrl_char, b"\x00", response=True)
await client.disconnect()
await asyncio.sleep(1)
await client.connect()
services = await client.get_services()
svc, ctrl_char, data_char = find_ota_service_and_chars(services)
if data_char is None:
LOG.error("Could not enter DFU mode")
await dump(client)
return -3
await client.write_gatt_char(ctrl_char, b"\x00", True)
written = 0
with open(args.gbl, "rb") as gbl:
LOG.info("Uploading %s", args.gbl)
while True:
chunk = gbl.read(200)
if not chunk:
break
await client.write_gatt_char(data_char, chunk, False)
written += len(chunk)
await client.write_gatt_char(ctrl_char, b"\x03", True)
LOG.info("Wrote %d bytes from %s to %s", written, args.gbl, device)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment