Skip to content

Instantly share code, notes, and snippets.

@Cacti-Master
Forked from ElishaAz/gamesir_t1d_bleak.py
Last active March 14, 2025 22:57
Show Gist options
  • Select an option

  • Save Cacti-Master/89f0f397a13f28ada046ec42477bc9fd to your computer and use it in GitHub Desktop.

Select an option

Save Cacti-Master/89f0f397a13f28ada046ec42477bc9fd to your computer and use it in GitHub Desktop.
A client for the GameSir T1D controller in python, using bleak. And a controller for the Tello using it.
import asyncio
from typing import Optional
from bleak import BleakClient, discover
STICK_MIN = 0
STICK_MAX = 1023
STICK_MID = 512
CONTROLLER_NAME = "Gamesir-T1d"
class T1D:
_previous_state = ""
def __init__(self):
self.L1 = 0
self.L2 = 0
self.R1 = 0
self.R2 = 0
self.X = 0
self.Y = 0
self.A = 0
self.B = 0
self.C1 = 0
self.C2 = 0
self.MENU = 0
self.Down = 0
self.Up = 0
self.Left = 0
self.Right = 0
self.LX = 512
self.LY = 512
self.RX = 512
self.RY = 512
self._controller = None
async def scan(self):
print("Scanning...")
devices = await discover()
for d in devices:
name: str = d.name
print(name)
if name.startswith(CONTROLLER_NAME):
self._controller = BleakClient(d)
print(f"Found Controller! mac: '{d.address}'")
break
raise RuntimeError("No controller found!")
async def connect(self, address: Optional[str] = None):
if address is None:
await self.scan()
else:
self._controller = BleakClient(address)
print("Connecting...")
await self._controller.connect()
await self.get_state()
print("Connected")
async def get_state(self) -> bool:
self._state_vec = await self._read()
if self._state_vec[0] == 0xc9:
return False
if self._previous_state != self._state_vec:
self._previous_state = self._state_vec
self.parse_state()
return True
return False
async def _read(self) -> bytearray:
return await self._controller.read_gatt_char("00008651-0000-1000-8000-00805f9b34fb")
def parse_state(self):
data = self._state_vec
self.L1 = int(bool(data[9] & 0x40))
self.L2 = int(data[7])
self.R1 = int(bool(data[9] & 0x80))
self.R2 = int(data[8])
self.X = int(bool(data[9] & 0x08))
self.Y = int(bool(data[9] & 0x10))
self.A = int(bool(data[9] & 0x01))
self.B = int(bool(data[9] & 0x02))
self.C1 = int(bool(data[10] & 0x04))
self.C2 = int(bool(data[10] & 0x08))
self.MENU = int(bool(data[9] & 0x04))
self.Down = int(bool(data[11] == 0x05))
self.Up = int(bool(data[11] == 0x01))
self.Left = int(bool(data[11] == 0x07))
self.Right = int(bool(data[11] == 0x03))
self.LX = int(((data[2]) << 2) | (data[3] >> 6))
self.LY = int(((data[3] & 0x3f) << 4) + (data[4] >> 4))
self.RX = int(((data[4] & 0xf) << 6) | (data[5] >> 2))
self.RY = int(((data[5] & 0x3) << 8) + ((data[6])))
def __str__(self):
return "L1: {}\nL2: {}\nR1: {}\nR2: {}\nX : {}\nY : {}\nA : {}\nB : {}\nC1: {}\nC2: {}\nMENU: " \
" {}\nDown: {}\nUp: {}\nLeft: {}\nRight: {}\nLX : {}\nLY : {}\nRX : {}\nRY : {}" \
.format(
self.L1, self.L2, self.R1, self.R2, self.X, self.Y, self.A, self.B, self.C1, self.C2, self.MENU,
self.Down, self.Up, self.Left, self.Right, self.LX, self.LY, self.RX, self.RY
)
if __name__ == "__main__":
async def main():
controller = T1D()
await controller.connect()
while 1:
if await controller.get_state():
print(controller)
asyncio.run(main())
CONTROLLER_MAC_ADDRESS = 'XX:XX:XX:XX:XX:XX' # Replace with your controller's MAC address
import sys
import traceback
from threading import Thread
import tellopy
import av
import cv2.cv2 as cv2
import numpy
import time
from gamesir_t1d_bleak import *
CENTER_RANGE = 10
TURN_AMOUNT = 90
RAISE_AMOUNT = 50
MOVE_AMOUNT = 50
def is_stick_pushed(stick):
return stick - STICK_MID < CENTER_RANGE or STICK_MIN - stick < CENTER_RANGE
def move_stick(amount, pos, neg):
if amount > 0:
pos(amount)
elif amount < 0:
neg(amount)
take_picture = False
take_video = False
def controller_thread(drone: tellopy.Tello):
global take_picture, take_video
async def sync():
global take_picture, start_video, take_video
controller = T1D()
await controller.connect(CONTROLLER_MAC_ADDRESS)
while True:
if await controller.get_state():
if controller.A:
drone.land()
elif controller.B:
drone.palm_land()
elif controller.Y:
drone.takeoff()
elif controller.L2 > 126:
if controller.Up:
drone.flip_forward()
elif controller.Right:
drone.flip_right()
elif controller.Left:
drone.flip_left()
elif controller.Down:
drone.flip_back()
elif controller.L1:
drone.take_picture()
take_picture = True
elif controller.R1:
take_video = True
elif controller.Down:
drone.land()
drone.left_x = (controller.LX - STICK_MID) / STICK_MID
drone.left_y = (STICK_MID - controller.LY) / STICK_MID
drone.right_x = (controller.RX - STICK_MID) / STICK_MID
drone.right_y = (STICK_MID - controller.RY) / STICK_MID
asyncio.run(sync())
def main():
global take_picture
drone = tellopy.Tello()
print("Connecting to controller...")
try:
drone.connect()
drone.wait_for_connection(60.0)
retry = 3
container = None
while container is None and 0 < retry:
retry -= 1
try:
container = av.open(drone.get_video_stream())
except av.AVError as ave:
print(ave)
print('retry...')
Thread(target=controller_thread, args=(drone,)).start()
frame_skip = 300
while True:
for frame in container.decode(video=0):
if 0 < frame_skip:
frame_skip = frame_skip - 1
continue
start_time = time.time()
image = cv2.cvtColor(numpy.array(frame.to_image()), cv2.COLOR_RGB2BGR)
cv2.imshow('Original', image)
if take_picture:
take_picture = False
cv2.imwrite(f"Image {time.strftime('%Y%m%d-%H%M%S')}.png", image)
key = cv2.waitKey(1)
if key != -1:
break
if frame.time_base < 1.0 / 60:
time_base = 1.0 / 60
else:
time_base = frame.time_base
frame_skip = int((time.time() - start_time) / time_base)
except Exception as ex:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
print(ex)
finally:
drone.quit()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment