Skip to content

Instantly share code, notes, and snippets.

@AmitGupta7580
Last active November 1, 2021 12:57
Show Gist options
  • Save AmitGupta7580/fe7612cea70edee78dd4fb9716db683f to your computer and use it in GitHub Desktop.
Save AmitGupta7580/fe7612cea70edee78dd4fb9716db683f to your computer and use it in GitHub Desktop.
import airsimneurips
import airsim
import time
import socket
import numpy as np
# Network Configuration
################
HOST = "192.168.29.186"
PORT = 2222
DRONE_NAME = "drone_1"
ARENA = "Soccer_Field_Easy"
DURATION = 50
################
# Invert RC Inputs
# DEFAULTS
################
INVERT_P = True
INVERT_R = False
INVERT_T = False
INVERT_Y = True
################
# These controls sensitivity of RPYT
# Change as per your needs
# Higher value means less sensitvity and vice versa
SENSITIVITY_P = 750.0
SENSITIVITY_R = SENSITIVITY_P
SENSITIVITY_T = 100.0 # 256.0
SENSITIVITY_Y = 150.0
# RC input Resolution
VAL_T = 256
VAL_Y = 256
VAL_RP = 256
# NO TOUCHY TOUCHY
HOVER_THROTTLE = 2.4 #-0.3
HIGH_THROTTLE = 0.4
class World():
def __init__(self, drone_name = "drone_1"):
self.airsim_client = airsimneurips.MultirotorClient()
self.drone_name = drone_name
def load_level(self, level_name, sleep_sec = 2.0):
self.level_name = level_name
self.airsim_client.simLoadLevel(self.level_name)
self.airsim_client.confirmConnection() # failsafe
time.sleep(sleep_sec) # let the environment load completely
def initialize_drone(self):
self.airsim_client.enableApiControl(vehicle_name=self.drone_name)
self.airsim_client.arm(vehicle_name=self.drone_name)
time.sleep(0.2)
def start_race(self):
self.airsim_client.simStartRace(1)
time.sleep(0.2)
class HandelClient(object):
def __init__(self, drone_name, airsim_client, rc_client):
self.drone_name = drone_name
self.airsim_client = airsim_client
self.rc_client = rc_client
self.CURRENT_POSE = None
self.z = None
def handel(self):
while True:
data = self.rc_client.recv(1024)[2:]
data = data.split(b"\x00")[0].decode()
self.execute_command(data)
def execute_command(self, data):
self.CURRENT_POSE = self.airsim_client.simGetVehiclePose(self.drone_name) # Returns Pose object for the specified vehicle
command = data.split("@")[0]
# Fixed Height Flight Mode
if command == "lmv":
try:
roll, pitch, throttle, yaw = self.rc_inputs(data)
print("Throttle: {} Yaw: {} Pitch: {} Roll: {}".format(throttle, yaw, pitch, roll))
yawmode_object = airsim.YawMode(is_rate= True, yaw_or_rate=0.0)
# Setting vehicle's liner velocity in x and y to be zero
# Setting its height to last known z value
if pitch == 0.0 and roll == 0.0:
# Stop the vehicle on zero roll and pitch
self.airsim_client.moveByVelocityZAsync(vx=0, vy=0, z=self.z, duration=DURATION, drivetrain=0, yaw_mode=yawmode_object, vehicle_name=self.drone_name)
# Move the vehicle at fixed height
self.airsim_client.moveByRollPitchYawrateZAsync(roll, pitch, yaw, self.z, DURATION, vehicle_name = self.drone_name)
except:
print("Retrying...")
# Manual Flight Mode
if command == "mv":
try:
self.z = self.CURRENT_POSE.position.z_val
roll, pitch, throttle, yaw = self.rc_inputs(data)
if throttle == 0.0:
throttle = HOVER_THROTTLE
print("Throttle: {} Yaw: {} Pitch: {} Roll: {}".format(throttle, yaw, pitch, roll))
self.airsim_client.moveByRollPitchYawrateZAsync(roll, pitch, yaw, throttle, DURATION, vehicle_name = self.drone_name)
except:
print("Retrying...")
def rc_inputs(self, data):
'''
Function to send rc inputs
'''
try:
throttle, yaw, pitch, roll = map(lambda x: float(x), data.split("@")[1:])
# Limiting RC inputs
throttle = np.clip(throttle, -VAL_T, VAL_T*2)
yaw = np.clip(yaw, -VAL_Y, VAL_Y)
roll = np.clip(roll, -VAL_RP, VAL_RP)
pitch = np.clip(pitch, -VAL_RP, VAL_RP)
# Converting raw RC inputs
throttle, yaw, pitch, roll = self.convert_rc_inputs(throttle, yaw, pitch, roll)
return roll, pitch, throttle, yaw
except ValueError:
print("Possibly Data Loss..")
def convert_rc_inputs(self, throttle, yaw, pitch, roll):
'''
Function to convert the raw RC values into AirSim's throt, pitch, yaw, roll
'''
if INVERT_T:
throttle = -(throttle/SENSITIVITY_T)
else:
throttle = (throttle/SENSITIVITY_T)
if INVERT_Y:
yaw = -(yaw/SENSITIVITY_Y)
else:
yaw = (yaw/SENSITIVITY_Y)
if INVERT_P:
pitch = -(pitch/SENSITIVITY_P)
else:
pitch = (pitch/SENSITIVITY_P)
if INVERT_R:
roll = -(roll/SENSITIVITY_R)
else:
roll = (roll/SENSITIVITY_R)
throttle = np.clip(throttle, -HIGH_THROTTLE, (VAL_T*2)/SENSITIVITY_T)
return throttle, yaw, pitch, roll
class RCReciever(object):
def __init__(self, ip = "127.0.0.1", port = 2222, drone_name = "drone_1", airsim_client = None):
self.ip = ip
self.port = port
self.airsim_client = airsim_client
self.drone_name = drone_name
# socket for rc connection
self.server_socket = socket.socket()
def start(self):
try:
self.server_socket.bind((self.ip, self.port))
except Exception:
print("Port is busy..")
return
print(f"[+] Socket successfully created at {self.ip}, {self.port}")
print("[+] Now connect your Remote Controller app")
self.server_socket.listen(5)
# accept rc connection
self.rc_client, addr = self.server_socket.accept()
print(f"[+] Got connection from {addr}")
handler = HandelClient(self.drone_name, self.airsim_client, self.rc_client)
try:
handler.handel()
except KeyboardInterrupt:
print("[-] KeyboardInterrupt rc_client disconnected")
self.rc_client.close()
self.server_socket.close()
if __name__ == "__main__":
world = World(DRONE_NAME)
world.load_level(ARENA)
world.initialize_drone()
world.start_race()
reciever = RCReciever(
ip=HOST,
port=PORT,
drone_name=DRONE_NAME,
airsim_client=world.airsim_client
)
reciever.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment