Last active
November 20, 2023 01:06
-
-
Save antonvh/1f1d9c563268b4a8e9e1d7297e62ad53 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Hub to hub remote control program. Run this on the remote control | |
# Full tutorial here: | |
# https://antonsmindstorms.com/2021/06/19/how-to-remote-control-lego-spike-prime-and-robot-inventor-with-python/ | |
# TO STOP THIS SCRIPT ALWAYS USE THE BUTTON ON THE HUB. | |
# AVOID THE STOP BUTTON IN THE MINDSTORSM APP! | |
# Building instructions here: | |
# https://antonsmindstorms.com/product/remote-control-transmitter-with-mindstorms-51515/ | |
# Use with the the car script here: | |
# https://gist.github.com/antonvh/88548d95e771043662f038de451e28f2 | |
#... or with the tank script here: | |
# https://gist.github.com/antonvh/aca9e9a32aaebe337af3fb1a6f2712aa | |
# Authors: Anton's Mindstorms & Ste7an. | |
# Most of it is library bluetooth code. | |
# Scroll to line 360 for the core program. | |
# Finds a robot with bluetooth peripheral service, advertising under the default name 'robot' | |
# After connicting this script sends a bytestring with joystick positions 30 times per second. | |
# Format "bbbbBBiiB", l_stick_hor, l_stick_ver, r_stick_hor, r_stick_ver, l_trigger, r_trigger, setting1, setting2, buttons_char | |
# ===== This should probably go into an import one day ===== # | |
import bluetooth | |
import random | |
import struct | |
import time | |
import micropython | |
import ubinascii | |
from hub import display, Image | |
from micropython import const | |
# Used irqs: _IRQ_SCAN_DONE, _IRQ_SCAN_RESULT, _IRQ_PERIPHERAL_CONNECT, _IRQ_PERIPHERAL_DISCONNECT, | |
# _IRQ_GATTC_SERVICE_RESULT, _IRQ_GATTC_SERVICE_DONE, _IRQ_GATTC_CHARACTERISTIC_RESULT, | |
# _IRQ_GATTC_WRITE_DONE, _IRQ_GATTC_NOTIFY, _IRQ_GATTC_READ_RESULT | |
_IRQ_GATTC_SERVICE_DONE = const(10) | |
_IRQ_GATTC_WRITE_DONE = const(17) | |
if 'FLAG_INDICATE' in dir(bluetooth): | |
# We're on MINDSTORMS Robot Inventor | |
# New version of bluetooth | |
_IRQ_SCAN_RESULT = 5 | |
_IRQ_SCAN_DONE = 6 | |
_IRQ_PERIPHERAL_CONNECT = 7 | |
_IRQ_PERIPHERAL_DISCONNECT = 8 | |
_IRQ_GATTC_SERVICE_RESULT = 9 | |
_IRQ_GATTC_CHARACTERISTIC_RESULT = 11 | |
_IRQ_GATTC_READ_RESULT = 15 | |
_IRQ_GATTC_NOTIFY = 18 | |
_IRQ_GATTC_CHARACTERISTIC_DONE = 12 | |
else: | |
# We're probably on SPIKE Prime | |
_IRQ_SCAN_RESULT = 1 << 4 | |
_IRQ_SCAN_DONE = 1 << 5 | |
_IRQ_PERIPHERAL_CONNECT = 1 << 6 | |
_IRQ_PERIPHERAL_DISCONNECT = 1 << 7 | |
_IRQ_GATTC_SERVICE_RESULT = 1 << 8 | |
_IRQ_GATTC_CHARACTERISTIC_RESULT = 1 << 9 | |
_IRQ_GATTC_READ_RESULT = 1 << 11 | |
_IRQ_GATTC_NOTIFY = 1 << 13 | |
_IRQ_GATTC_CHARACTERISTIC_DONE = 1 << 12 | |
_NOTIFY_ENABLE = const(1) | |
_INDICATE_ENABLE = const(2) | |
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") | |
_UART_TX = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") | |
_UART_RX = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") | |
# Advertising payloads are repeated packets of the following form: | |
#1 byte data length (N + 1) | |
#1 byte type (see constants below) | |
#N bytes type-specific data | |
_ADV_TYPE_FLAGS = const(0x01) | |
_ADV_TYPE_NAME = const(0x09) | |
_ADV_TYPE_UUID16_COMPLETE = const(0x3) | |
_ADV_TYPE_UUID32_COMPLETE = const(0x5) | |
_ADV_TYPE_UUID128_COMPLETE = const(0x7) | |
_ADV_TYPE_UUID16_MORE = const(0x2) | |
_ADV_TYPE_UUID32_MORE = const(0x4) | |
_ADV_TYPE_UUID128_MORE = const(0x6) | |
_ADV_TYPE_APPEARANCE = const(0x19) | |
# Generate a payload to be passed to gap_advertise(adv_data=...). | |
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): | |
payload = bytearray() | |
def _append(adv_type, value): | |
nonlocal payload | |
payload += struct.pack("BB", len(value) + 1, adv_type) + value | |
_append( | |
_ADV_TYPE_FLAGS, | |
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), | |
) | |
if name: | |
_append(_ADV_TYPE_NAME, name) | |
if services: | |
for uuid in services: | |
b = bytes(uuid) | |
if len(b) == 2: | |
_append(_ADV_TYPE_UUID16_COMPLETE, b) | |
elif len(b) == 4: | |
_append(_ADV_TYPE_UUID32_COMPLETE, b) | |
elif len(b) == 16: | |
_append(_ADV_TYPE_UUID128_COMPLETE, b) | |
# See org.bluetooth.characteristic.gap.appearance.xml | |
if appearance: | |
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance)) | |
return payload | |
def decode_field(payload, adv_type): | |
i = 0 | |
result = [] | |
while i + 1 < len(payload): | |
if payload[i + 1] == adv_type: | |
result.append(payload[i + 2 : i + payload[i] + 1]) | |
i += 1 + payload[i] | |
return result | |
def decode_name(payload): | |
n = decode_field(payload, _ADV_TYPE_NAME) | |
return str(n[0], "utf-8") if n else "" | |
def decode_services(payload): | |
services = [] | |
for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE): | |
services.append(bluetooth.UUID(struct.unpack("<h", u)[0])) | |
for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE): | |
services.append(bluetooth.UUID(struct.unpack("<d", u)[0])) | |
for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE): | |
services.append(bluetooth.UUID(u)) | |
return services | |
def show_logo(data): | |
if data[:5] == b'Image': | |
display.show(eval(data)) | |
else: | |
print(data) | |
class BLESimpleCentral: | |
def __init__(self, ble=None): | |
if ble == None: | |
ble = bluetooth.BLE() | |
self._ble = ble | |
self._ble.active(True) | |
self._ble.irq(self._irq) | |
self._reset() | |
def _reset(self): | |
# Cached name and address from a successful scan. | |
self._name = None | |
self._addr_type = None | |
self._addr = None | |
# Callbacks for completion of various operations. | |
# These reset back to None after being invoked. | |
self._scan_callback = None | |
self._conn_callback = None | |
self._read_callback = None | |
# Persistent callback for when new data is notified from the device. | |
self._notify_callback = show_logo | |
# Connected device. | |
self._conn_handle = None | |
self._start_handle = None | |
self._end_handle = None | |
self._tx_handle = None | |
self._rx_handle = None | |
self._n=0 | |
def _on_scan(self, addr_type, addr, name): | |
if addr_type is not None: | |
print("Found peripheral:", addr_type, addr, name) | |
time.sleep_ms(500) | |
self.connect() | |
else: | |
self.timed_out = True | |
print("No uart peripheral '{}' found.".format(self._search_name)) | |
def scan_connect(self, name="robot"): | |
self.timed_out = False | |
self.scan(search_name=name, callback=self._on_scan) | |
while not self.is_connected() and not self.timed_out: | |
# print("Waiting for connection...") | |
time.sleep_ms(10) | |
return not self.timed_out | |
def _irq(self, event, data): | |
if event not in (_IRQ_SCAN_DONE, _IRQ_SCAN_RESULT, _IRQ_PERIPHERAL_CONNECT, _IRQ_PERIPHERAL_DISCONNECT, | |
_IRQ_GATTC_SERVICE_RESULT, _IRQ_GATTC_SERVICE_DONE, _IRQ_GATTC_CHARACTERISTIC_RESULT, | |
_IRQ_GATTC_WRITE_DONE, _IRQ_GATTC_NOTIFY, _IRQ_GATTC_READ_RESULT, _IRQ_GATTC_CHARACTERISTIC_DONE): | |
print(event, hex(event)) | |
if event == _IRQ_SCAN_RESULT: | |
addr_type, addr, adv_type, rssi, adv_data = data | |
name=decode_name(adv_data) or "?" | |
# print('type:{} addr:{} name:{} adv_type: {} rssi:{} data:{}'.format(addr_type, ubinascii.hexlify(addr), name, adv_type,rssi,ubinascii.hexlify(adv_data))) | |
if name == self._search_name: | |
# Found a potential device, remember it | |
self._addr_type = addr_type | |
self._addr = bytes(addr) # Note: addr buffer is owned by caller so need to copy it. | |
self._name = name | |
# ... and stop scanning. This triggers the IRQ_SCAN_DONE and the on_scan callback. | |
self._ble.gap_scan(None) | |
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: | |
# Called once service discovery is complete. | |
# Note: Status will be zero on success, implementation-specific value otherwise. | |
# conn_handle, status = data | |
pass | |
elif event == _IRQ_SCAN_DONE: | |
if self._scan_callback: | |
if self._addr: | |
# Found a device during the scan (and the scan was explicitly stopped). | |
self._scan_callback(self._addr_type, self._addr, self._name) | |
self._scan_callback = None | |
else: | |
# Scan timed out. | |
self._scan_callback(None, None, None) | |
elif event == _IRQ_PERIPHERAL_CONNECT: | |
# Connect successful. | |
conn_handle, addr_type, addr = data | |
if addr_type == self._addr_type and addr == self._addr: | |
self._conn_handle = conn_handle | |
self._ble.gattc_discover_services(self._conn_handle) | |
elif event == _IRQ_PERIPHERAL_DISCONNECT: | |
# Disconnect (either initiated by us or the remote end). | |
conn_handle, _, _ = data | |
if conn_handle == self._conn_handle: | |
# If it was initiated by us, it'll already be reset. | |
self._reset() | |
print("Disconnect from peripheral") | |
self.timed_out = True | |
elif event == _IRQ_GATTC_SERVICE_RESULT: | |
# Connected device returned a service. | |
conn_handle, start_handle, end_handle, uuid = data | |
self._n+=1 | |
if conn_handle == self._conn_handle and uuid == _UART_UUID: | |
self._start_handle, self._end_handle = start_handle, end_handle | |
# print("UART_UID handles",data) | |
time.sleep_ms(500) | |
self._ble.gattc_discover_characteristics(self._conn_handle, start_handle, end_handle) | |
elif event == _IRQ_GATTC_SERVICE_DONE: | |
time.sleep_ms(300) | |
# Service query complete. | |
if self._start_handle and self._end_handle: | |
self._ble.gattc_discover_characteristics( | |
self._conn_handle, self._start_handle, self._end_handle | |
) | |
else: | |
print("Failed to find uart service.") | |
self.timed_out = True | |
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: | |
# Connected device returned a characteristic. | |
conn_handle, def_handle, value_handle, properties, uuid = data | |
#print('gattc_char',data) | |
if conn_handle == self._conn_handle: | |
if uuid == _UART_RX: | |
self._rx_handle = value_handle | |
elif uuid == _UART_TX: | |
self._tx_handle = value_handle | |
elif event == _IRQ_GATTC_WRITE_DONE: | |
conn_handle, value_handle, status = data | |
print("TX complete") | |
elif event == _IRQ_GATTC_NOTIFY: | |
# print("_IRQ_GATTC_NOTIFY") | |
conn_handle, value_handle, notify_data = data | |
notify_data=bytes(notify_data) | |
if conn_handle == self._conn_handle and value_handle == self._tx_handle: | |
if self._notify_callback: | |
self._notify_callback(notify_data) | |
elif event == _IRQ_GATTC_READ_RESULT: | |
#print("_IRQ_GATTC_READ_RESULT") | |
# A read completed successfully. | |
conn_handle, value_handle, char_data = data | |
if conn_handle == self._conn_handle and value_handle in (self._rx_handle,self._buta_handle,self._butb_handle): | |
# print("handle,READ data",value_handle,bytes(char_data)) | |
self._read_callback(value_handle,bytes(char_data)) | |
# Returns true if we've successfully connected and discovered uart characteristics. | |
def is_connected(self): | |
return ( | |
self._conn_handle is not None | |
and self._tx_handle is not None | |
and self._rx_handle is not None | |
) | |
# Find a device advertising the uart service. | |
def scan(self, search_name=None, callback=None): | |
self._addr_type = None | |
self._addr = None | |
self._search_name=search_name | |
self._scan_callback = callback | |
self._ble.gap_scan(20000, 30000, 30000) | |
# Connect to the specified device (otherwise use cached address from a scan). | |
def connect(self, addr_type=None, addr=None, callback=None): | |
self._addr_type = addr_type or self._addr_type | |
self._addr = addr or self._addr | |
self._conn_callback = callback | |
if self._addr_type is None or self._addr is None: | |
return False | |
self._ble.gap_connect(self._addr_type, self._addr) | |
return True | |
# Disconnect from current device. | |
def disconnect(self): | |
if not self._conn_handle: | |
return | |
self._ble.gap_disconnect(self._conn_handle) | |
self._reset() | |
# Send data over the UART | |
def write(self, v, response=False): | |
if not self.is_connected(): | |
return | |
self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0) | |
# def enable_notify(self): | |
# if not self.is_connected(): | |
# return | |
# print("Enabled notify") | |
# #self._ble.gattc_write(self._conn_handle, self._acc_handle, struct.pack('<h', _NOTIFY_ENABLE), 0) | |
# for i in range(38,49): | |
# self._ble.gattc_write(self._conn_handle, i, struct.pack('<h', _NOTIFY_ENABLE), 0) | |
# time.sleep_ms(50) | |
# print("notified enabled") | |
def read(self,handle,callback): | |
if not self.is_connected(): | |
return | |
self._read_callback = callback | |
try: | |
self._ble.gattc_read(self._conn_handle, handle) | |
except: | |
pass | |
#print("gattc_read failed") | |
# Set handler for when data is received over the UART. | |
def on_notify(self, callback): | |
# self.enable_notify() | |
self._notify_callback = callback | |
# print("callback",callback) | |
# ===== End of library import bit. Now the real code. ===== # | |
from hub import button | |
from hub import port | |
# Some motor helper functions | |
def abs_pos(motor): | |
return motor.get()[2] | |
def pos(motor): | |
return motor.get()[1] | |
def clamp_int(n, floor=-100, ceiling=100): | |
return max(min(int(n),ceiling),floor) | |
def track_target(motor, target=0, gain=1.5): | |
m_pos = motor.get()[1] | |
motor.pwm( | |
clamp_int((m_pos-target)*-gain) | |
) | |
return m_pos | |
# Initialize | |
print("starting BLE") | |
remote_control = BLESimpleCentral() | |
print("Scanning & connecting") | |
found = remote_control.scan_connect() | |
if not found: | |
print ("Scanning timed out") | |
del(remote_control) | |
raise SystemExit | |
print("Connected") | |
# Use this if you want te receive data back from the robot | |
# Note that printing doesn't work here. It messes up comms. | |
# def on_rx(v): | |
# pass | |
# # print("RX", v) | |
# central.on_notify(on_rx) | |
# This can probably be less. TODO. | |
time.sleep_ms(1000) | |
steer = port.E.motor | |
steer.preset(abs_pos(steer)) | |
trigger = port.C.motor | |
trigger.preset(abs_pos(trigger)-40) | |
steer_trim = port.B.motor | |
steer_trim.preset(0) | |
thumb_trigger = port.A.motor | |
thumb_trigger.preset(abs_pos(thumb_trigger)-25) | |
while remote_control.is_connected(): | |
l_stick_hor, l_stick_ver, r_stick_hor, r_stick_ver, l_trigger, r_trigger, setting1, setting2, buttons_char = [0]*9 | |
# Check for button presses | |
buttons=[0]*8 | |
if button.left.was_pressed(): | |
buttons[0] = True | |
if button.right.was_pressed(): | |
buttons[1] = True | |
# Create a byte with ones for each pressed button. | |
for i in range(len(buttons)): | |
if buttons[i]: | |
buttons_char += 1 << i | |
# Check motor positions | |
# Track the 0 target and generate counter force if you | |
# push the motors. Multiply so maximum movement generates | |
# a number that is in the range -100,100 | |
# gain is how hard the motor pushes back | |
# Als compensate +5 for gravity on the trigger and thumb trigger | |
l_stick_hor = track_target(steer, gain=1) * 1.3 | |
r_stick_ver = (track_target(trigger, gain=2)+3) * -5.7 | |
l_trigger = (track_target(thumb_trigger)+5) * -2 | |
# Just reading the position will make the motor act like | |
# a pot meter. | |
setting1 = pos(steer_trim) | |
# Pack and transmit all remote control values | |
controller_state = struct.pack("bbbbBBiiB", | |
clamp_int(l_stick_hor), | |
clamp_int(l_stick_ver), | |
clamp_int(r_stick_hor), | |
clamp_int(r_stick_ver), | |
clamp_int(l_trigger,0,200), | |
clamp_int(r_trigger,0,200), | |
int(setting1), | |
int(setting2), | |
buttons_char) | |
remote_control.write(controller_state) | |
# Make sure to clean up when user stops program with center button | |
if button.center.is_pressed(): | |
break | |
# Limit the sending rate of control packets to about 30/s | |
time.sleep_ms(30) | |
# Clean up | |
remote_control.disconnect() | |
del(remote_control) | |
print("Disconnected") | |
raise SystemExit |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment