Last active
March 14, 2025 23:06
-
-
Save ElishaAz/a83dfa8f2d53497d7d1d0bca03bfced2 to your computer and use it in GitHub Desktop.
A client for the GameSir T1D controller in python, using bleak. And a controller for the Tello using it.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Dependnecies: | |
| ``` | |
| pip install bleak | |
| ``` | |
| Based on (https://github.com/Diallomm/hack_GamesirT1d/blob/main/src/T1D.py). | |
| Edited to use `bleak` for cross-platform support. | |
| Edited by: Elisha Azaria | |
| """ | |
| import asyncio | |
| from typing import Optional | |
| from bleak import BleakClient, discover | |
| STICK_MIN = 0 | |
| STICK_MAX = 1023 | |
| STICK_MID = 512 | |
| CONTROLLER_NAME = "Gamesir-T1d" | |
| class T1D: | |
| _previous_state = "" | |
| def __init__(self): | |
| self.L1 = 0 # 0 / 1 | |
| self.L2 = 0 # 0 - 255 | |
| self.R1 = 0 # 0 / 1 | |
| self.R2 = 0 # 0 - 255 | |
| self.X = 0 # 0 / 1 | |
| self.Y = 0 # 0 / 1 | |
| self.A = 0 # 0 / 1 | |
| self.B = 0 # 0 / 1 | |
| self.C1 = 0 # 0 / 1 | |
| self.C2 = 0 # 0 / 1 | |
| self.MENU = 0 # 0 / 1 | |
| self.Down = 0 # 0 / 1 | |
| self.Up = 0 # 0 / 1 | |
| self.Left = 0 # 0 / 1 | |
| self.Right = 0 # 0 / 1 | |
| self.LX = 512 # 0 - 1023 | |
| self.LY = 512 # 0 - 1023 | |
| self.RX = 512 # 0 - 1023 | |
| self.RY = 512 # 0 - 1023 | |
| self._controller = None | |
| async def scan(self): | |
| print("Scanning...") | |
| devices = await discover() | |
| for d in devices: | |
| name: str = d.name | |
| print(name) | |
| if name.startswith(CONTROLLER_NAME): | |
| self._controller = BleakClient(d) | |
| print(F"Found Controller! mac: '{d.address}") | |
| break | |
| raise RuntimeError("No controller found!") | |
| async def connect(self, address: Optional[str] = None): | |
| if address is None: | |
| await self.scan() | |
| else: | |
| self._controller = BleakClient(address) | |
| print("Connecting...") | |
| await self._controller.connect() | |
| await self.get_state() | |
| print("Connected") | |
| async def get_state(self) -> bool: | |
| # Returns True if state did change | |
| self._state_vec = await self._read() | |
| if self._state_vec[0] == 0xc9: # Creates garbage values | |
| return False | |
| if self._previous_state != self._state_vec: | |
| self._previous_state = self._state_vec | |
| self.parse_state() | |
| return True | |
| return False | |
| async def _read(self) -> bytearray: | |
| return await self._controller.read_gatt_char("00008651-0000-1000-8000-00805f9b34fb") | |
| def parse_state(self): | |
| # Notes: Last byte (data[19] is updated on every controller change - can be used as trigger instead of polling) | |
| data = self._state_vec | |
| self.L1 = int(bool(data[9] & 0x40)) | |
| self.L2 = int(data[7]) # int 0-255 | |
| self.R1 = int(bool(data[9] & 0x80)) | |
| self.R2 = int(data[8]) | |
| self.X = int(bool(data[9] & 0x08)) | |
| self.Y = int(bool(data[9] & 0x10)) | |
| self.A = int(bool(data[9] & 0x01)) | |
| self.B = int(bool(data[9] & 0x02)) | |
| self.C1 = int(bool(data[10] & 0x04)) | |
| self.C2 = int(bool(data[10] & 0x08)) | |
| self.MENU = int(bool(data[9] & 0x04)) | |
| self.Down = int(bool(data[11] == 0x05)) | |
| self.Up = int(bool(data[11] == 0x01)) | |
| self.Left = int(bool(data[11] == 0x07)) | |
| self.Right = int(bool(data[11] == 0x03)) | |
| self.LX = int(((data[2]) << 2) | (data[3] >> 6)) | |
| self.LY = int(((data[3] & 0x3f) << 4) + (data[4] >> 4)) | |
| self.RX = int(((data[4] & 0xf) << 6) | (data[5] >> 2)) | |
| self.RY = int(((data[5] & 0x3) << 8) + ((data[6]))) | |
| def __str__(self): | |
| return "L1: {}\nL2: {}\nR1: {}\nR2: {}\nX : {}\nY : {}\nA : {}\nB : {}\nC1: {}\nC2: {}\nMENU: " \ | |
| " {}\nDown: {}\nUp: {}\nLeft: {}\nRight: {}\nLX : {}\nLY : {}\nRX : {}\nRY : {}" \ | |
| .format( | |
| self.L1, self.L2, self.R1, self.R2, self.X, self.Y, self.A, self.B, self.C1, self.C2, self.MENU, | |
| self.Down, self.Up, self.Left, self.Right, self.LX, self.LY, self.RX, self.RY | |
| ) | |
| if __name__ == "__main__": | |
| async def main(): | |
| controller = T1D() # Example address | |
| await controller.connect() | |
| while 1: | |
| if await controller.get_state(): | |
| print(controller) | |
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Control the Tello using a GameSir T1D controller on a computer. | |
| ## Requirements | |
| Install `av`s dependencies (https://pyav.org/docs/develop/overview/installation.html). | |
| Install python dependencies using `pip`: | |
| ``` | |
| pip install tellopy av numpy opencv-python pillow bleak | |
| ``` | |
| ## How to use | |
| 1. Find your controller's MAC address from | |
| - Your bluetooth manager | |
| - Under the flap in your controller | |
| - `gamesir_t1d_bleak.py`s output. | |
| And place it in `CONTROLLER_MAC_ADDRESS` below | |
| 2. Connect to your Tello using Wi-Fi | |
| 3. Turn on Bluetooth on your computer and turn on your controller | |
| 4. Run this file | |
| 5. Enjoy | |
| ## Controls | |
| Left stick: | |
| Y (Up / Down) - throttle | |
| X (Right / Left) - turn | |
| Right stick: | |
| Y (Up / Down) - forward / back | |
| X (Right / Left) - right / left | |
| Y - takeoff | |
| A - land | |
| B - palm-land | |
| L1 - take picture (saved in current directory) | |
| R1 - take video (not implemented yet) | |
| With L2 held: | |
| Up - flip forward | |
| Down - flip back | |
| Right - flip right | |
| Left - flip left | |
| ## Credits | |
| Based on (https://github.com/hanyazou/TelloPy/blob/develop-0.7.0/tellopy/examples/video_effect.py) from TelloPy. | |
| `gamesir_t1d_bleak` is based on (https://github.com/Diallomm/hack_GamesirT1d/blob/main/src/T1D.py). | |
| Made by Elisha Azaria | |
| """ | |
| CONTROLLER_MAC_ADDRESS = 'C6:86:A1:04:EC:64' | |
| import sys | |
| import traceback | |
| from threading import Thread | |
| import tellopy | |
| import av | |
| import cv2.cv2 as cv2 # for avoidance of pylint error | |
| import numpy | |
| import time | |
| from gamesir_t1d_bleak import * | |
| CENTER_RANGE = 10 | |
| TURN_AMOUNT = 90 | |
| RAISE_AMOUNT = 50 | |
| MOVE_AMOUNT = 50 | |
| def is_stick_pushed(stick): | |
| return stick - STICK_MID < CENTER_RANGE or STICK_MIN - stick < CENTER_RANGE | |
| def move_stick(amount, pos, neg): | |
| if amount > 0: | |
| pos(amount) | |
| elif amount < 0: | |
| neg(amount) | |
| take_picture = False | |
| take_video = False | |
| def controller_thread(drone: tellopy.Tello): | |
| global take_picture, take_video | |
| async def sync(): | |
| global take_picture, start_video, take_video | |
| controller = T1D() # 'C6:86:A1:04:EC:64' | |
| await controller.connect(CONTROLLER_MAC_ADDRESS) | |
| while True: | |
| if await controller.get_state(): | |
| # print(controller) | |
| if controller.A: | |
| drone.land() | |
| elif controller.B: | |
| drone.palm_land() | |
| elif controller.Y: | |
| drone.takeoff() | |
| elif controller.L2 > 126: | |
| if controller.Up: | |
| drone.flip_forward() | |
| elif controller.Right: | |
| drone.flip_right() | |
| elif controller.Left: | |
| drone.flip_left() | |
| elif controller.Down: | |
| drone.flip_back() | |
| elif controller.L1: | |
| drone.take_picture() | |
| take_picture = True | |
| elif controller.R1: | |
| take_video = True | |
| elif controller.Down: | |
| drone.land() | |
| drone.left_x = (controller.LX - STICK_MID) / STICK_MID | |
| drone.left_y = (STICK_MID - controller.LY) / STICK_MID | |
| drone.right_x = (controller.RX - STICK_MID) / STICK_MID | |
| drone.right_y = (STICK_MID - controller.RY) / STICK_MID | |
| asyncio.run(sync()) | |
| def main(): | |
| global take_picture | |
| drone = tellopy.Tello() | |
| print("Connecting to controller...") | |
| try: | |
| drone.connect() | |
| drone.wait_for_connection(60.0) | |
| retry = 3 | |
| container = None | |
| while container is None and 0 < retry: | |
| retry -= 1 | |
| try: | |
| container = av.open(drone.get_video_stream()) | |
| except av.AVError as ave: | |
| print(ave) | |
| print('retry...') | |
| Thread(target=controller_thread, args=(drone,)).start() | |
| # skip first 300 frames | |
| frame_skip = 300 | |
| while True: | |
| for frame in container.decode(video=0): | |
| if 0 < frame_skip: | |
| frame_skip = frame_skip - 1 | |
| continue | |
| start_time = time.time() | |
| image = cv2.cvtColor(numpy.array(frame.to_image()), cv2.COLOR_RGB2BGR) | |
| cv2.imshow('Original', image) | |
| if take_picture: | |
| take_picture = False | |
| cv2.imwrite(F"Image {time.strftime('%Y%m%d-%H%M%S')}.png", image) | |
| # cv2.imshow('Canny', cv2.Canny(image, 100, 200)) | |
| key = cv2.waitKey(1) | |
| if key != -1: | |
| break | |
| if frame.time_base < 1.0 / 60: | |
| time_base = 1.0 / 60 | |
| else: | |
| time_base = frame.time_base | |
| frame_skip = int((time.time() - start_time) / time_base) | |
| except Exception as ex: | |
| exc_type, exc_value, exc_traceback = sys.exc_info() | |
| traceback.print_exception(exc_type, exc_value, exc_traceback) | |
| print(ex) | |
| finally: | |
| drone.quit() | |
| cv2.destroyAllWindows() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I made a fork and created a reconnect feature. I also:
Error Handling: Added handling for Bluetooth connection issues.
State Detection: Improved detection for when the controller state changes.
Code Simplification: Streamlined the code for readability.
Removed Unused Code: Cleaned up unnecessary imports and variables.
Controller Input Handling: Improved joystick input processing for better control
COMMENT ON THIS COMMENT IF I MESSED UP THE CODE AND IT DOESN'T WORK.
ALSO LET ME KNOW IF YOU WANT ME TO REMOVE MY FORK.