ALL CODE CREDIT TO: https://github.com/ElishaAz CODE FROM REPOSITORY: https://gist.github.com/ElishaAz/a83dfa8f2d53497d7d1d0bca03bfced2 --------------------------------_ THE ORIGINAL OWNER OF THIS CODE HAS ALL RIGHTS TO TAKE IT DOWN --------------------------------_
-
-
Save Cacti-Master/89f0f397a13f28ada046ec42477bc9fd to your computer and use it in GitHub Desktop.
A client for the GameSir T1D controller in python, using bleak. And a controller for the Tello using it.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from typing import Optional | |
| from bleak import BleakClient, discover | |
| STICK_MIN = 0 | |
| STICK_MAX = 1023 | |
| STICK_MID = 512 | |
| CONTROLLER_NAME = "Gamesir-T1d" | |
| class T1D: | |
| _previous_state = "" | |
| def __init__(self): | |
| self.L1 = 0 | |
| self.L2 = 0 | |
| self.R1 = 0 | |
| self.R2 = 0 | |
| self.X = 0 | |
| self.Y = 0 | |
| self.A = 0 | |
| self.B = 0 | |
| self.C1 = 0 | |
| self.C2 = 0 | |
| self.MENU = 0 | |
| self.Down = 0 | |
| self.Up = 0 | |
| self.Left = 0 | |
| self.Right = 0 | |
| self.LX = 512 | |
| self.LY = 512 | |
| self.RX = 512 | |
| self.RY = 512 | |
| self._controller = None | |
| async def scan(self): | |
| print("Scanning...") | |
| devices = await discover() | |
| for d in devices: | |
| name: str = d.name | |
| print(name) | |
| if name.startswith(CONTROLLER_NAME): | |
| self._controller = BleakClient(d) | |
| print(f"Found Controller! mac: '{d.address}'") | |
| break | |
| raise RuntimeError("No controller found!") | |
| async def connect(self, address: Optional[str] = None): | |
| if address is None: | |
| await self.scan() | |
| else: | |
| self._controller = BleakClient(address) | |
| print("Connecting...") | |
| await self._controller.connect() | |
| await self.get_state() | |
| print("Connected") | |
| async def get_state(self) -> bool: | |
| self._state_vec = await self._read() | |
| if self._state_vec[0] == 0xc9: | |
| return False | |
| if self._previous_state != self._state_vec: | |
| self._previous_state = self._state_vec | |
| self.parse_state() | |
| return True | |
| return False | |
| async def _read(self) -> bytearray: | |
| return await self._controller.read_gatt_char("00008651-0000-1000-8000-00805f9b34fb") | |
| def parse_state(self): | |
| data = self._state_vec | |
| self.L1 = int(bool(data[9] & 0x40)) | |
| self.L2 = int(data[7]) | |
| self.R1 = int(bool(data[9] & 0x80)) | |
| self.R2 = int(data[8]) | |
| self.X = int(bool(data[9] & 0x08)) | |
| self.Y = int(bool(data[9] & 0x10)) | |
| self.A = int(bool(data[9] & 0x01)) | |
| self.B = int(bool(data[9] & 0x02)) | |
| self.C1 = int(bool(data[10] & 0x04)) | |
| self.C2 = int(bool(data[10] & 0x08)) | |
| self.MENU = int(bool(data[9] & 0x04)) | |
| self.Down = int(bool(data[11] == 0x05)) | |
| self.Up = int(bool(data[11] == 0x01)) | |
| self.Left = int(bool(data[11] == 0x07)) | |
| self.Right = int(bool(data[11] == 0x03)) | |
| self.LX = int(((data[2]) << 2) | (data[3] >> 6)) | |
| self.LY = int(((data[3] & 0x3f) << 4) + (data[4] >> 4)) | |
| self.RX = int(((data[4] & 0xf) << 6) | (data[5] >> 2)) | |
| self.RY = int(((data[5] & 0x3) << 8) + ((data[6]))) | |
| def __str__(self): | |
| return "L1: {}\nL2: {}\nR1: {}\nR2: {}\nX : {}\nY : {}\nA : {}\nB : {}\nC1: {}\nC2: {}\nMENU: " \ | |
| " {}\nDown: {}\nUp: {}\nLeft: {}\nRight: {}\nLX : {}\nLY : {}\nRX : {}\nRY : {}" \ | |
| .format( | |
| self.L1, self.L2, self.R1, self.R2, self.X, self.Y, self.A, self.B, self.C1, self.C2, self.MENU, | |
| self.Down, self.Up, self.Left, self.Right, self.LX, self.LY, self.RX, self.RY | |
| ) | |
| if __name__ == "__main__": | |
| async def main(): | |
| controller = T1D() | |
| await controller.connect() | |
| while 1: | |
| if await controller.get_state(): | |
| print(controller) | |
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| CONTROLLER_MAC_ADDRESS = 'XX:XX:XX:XX:XX:XX' # Replace with your controller's MAC address | |
| import sys | |
| import traceback | |
| from threading import Thread | |
| import tellopy | |
| import av | |
| import cv2.cv2 as cv2 | |
| import numpy | |
| import time | |
| from gamesir_t1d_bleak import * | |
| CENTER_RANGE = 10 | |
| TURN_AMOUNT = 90 | |
| RAISE_AMOUNT = 50 | |
| MOVE_AMOUNT = 50 | |
| def is_stick_pushed(stick): | |
| return stick - STICK_MID < CENTER_RANGE or STICK_MIN - stick < CENTER_RANGE | |
| def move_stick(amount, pos, neg): | |
| if amount > 0: | |
| pos(amount) | |
| elif amount < 0: | |
| neg(amount) | |
| take_picture = False | |
| take_video = False | |
| def controller_thread(drone: tellopy.Tello): | |
| global take_picture, take_video | |
| async def sync(): | |
| global take_picture, start_video, take_video | |
| controller = T1D() | |
| await controller.connect(CONTROLLER_MAC_ADDRESS) | |
| while True: | |
| if await controller.get_state(): | |
| if controller.A: | |
| drone.land() | |
| elif controller.B: | |
| drone.palm_land() | |
| elif controller.Y: | |
| drone.takeoff() | |
| elif controller.L2 > 126: | |
| if controller.Up: | |
| drone.flip_forward() | |
| elif controller.Right: | |
| drone.flip_right() | |
| elif controller.Left: | |
| drone.flip_left() | |
| elif controller.Down: | |
| drone.flip_back() | |
| elif controller.L1: | |
| drone.take_picture() | |
| take_picture = True | |
| elif controller.R1: | |
| take_video = True | |
| elif controller.Down: | |
| drone.land() | |
| drone.left_x = (controller.LX - STICK_MID) / STICK_MID | |
| drone.left_y = (STICK_MID - controller.LY) / STICK_MID | |
| drone.right_x = (controller.RX - STICK_MID) / STICK_MID | |
| drone.right_y = (STICK_MID - controller.RY) / STICK_MID | |
| asyncio.run(sync()) | |
| def main(): | |
| global take_picture | |
| drone = tellopy.Tello() | |
| print("Connecting to controller...") | |
| try: | |
| drone.connect() | |
| drone.wait_for_connection(60.0) | |
| retry = 3 | |
| container = None | |
| while container is None and 0 < retry: | |
| retry -= 1 | |
| try: | |
| container = av.open(drone.get_video_stream()) | |
| except av.AVError as ave: | |
| print(ave) | |
| print('retry...') | |
| Thread(target=controller_thread, args=(drone,)).start() | |
| frame_skip = 300 | |
| while True: | |
| for frame in container.decode(video=0): | |
| if 0 < frame_skip: | |
| frame_skip = frame_skip - 1 | |
| continue | |
| start_time = time.time() | |
| image = cv2.cvtColor(numpy.array(frame.to_image()), cv2.COLOR_RGB2BGR) | |
| cv2.imshow('Original', image) | |
| if take_picture: | |
| take_picture = False | |
| cv2.imwrite(f"Image {time.strftime('%Y%m%d-%H%M%S')}.png", image) | |
| key = cv2.waitKey(1) | |
| if key != -1: | |
| break | |
| if frame.time_base < 1.0 / 60: | |
| time_base = 1.0 / 60 | |
| else: | |
| time_base = frame.time_base | |
| frame_skip = int((time.time() - start_time) / time_base) | |
| except Exception as ex: | |
| exc_type, exc_value, exc_traceback = sys.exc_info() | |
| traceback.print_exception(exc_type, exc_value, exc_traceback) | |
| print(ex) | |
| finally: | |
| drone.quit() | |
| cv2.destroyAllWindows() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment