Last active
November 1, 2021 12:57
-
-
Save AmitGupta7580/fe7612cea70edee78dd4fb9716db683f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airsimneurips | |
import airsim | |
import time | |
import socket | |
import numpy as np | |
# Network Configuration | |
################ | |
HOST = "192.168.29.186" | |
PORT = 2222 | |
DRONE_NAME = "drone_1" | |
ARENA = "Soccer_Field_Easy" | |
DURATION = 50 | |
################ | |
# Invert RC Inputs | |
# DEFAULTS | |
################ | |
INVERT_P = True | |
INVERT_R = False | |
INVERT_T = False | |
INVERT_Y = True | |
################ | |
# These controls sensitivity of RPYT | |
# Change as per your needs | |
# Higher value means less sensitvity and vice versa | |
SENSITIVITY_P = 750.0 | |
SENSITIVITY_R = SENSITIVITY_P | |
SENSITIVITY_T = 100.0 # 256.0 | |
SENSITIVITY_Y = 150.0 | |
# RC input Resolution | |
VAL_T = 256 | |
VAL_Y = 256 | |
VAL_RP = 256 | |
# NO TOUCHY TOUCHY | |
HOVER_THROTTLE = 2.4 #-0.3 | |
HIGH_THROTTLE = 0.4 | |
class World(): | |
def __init__(self, drone_name = "drone_1"): | |
self.airsim_client = airsimneurips.MultirotorClient() | |
self.drone_name = drone_name | |
def load_level(self, level_name, sleep_sec = 2.0): | |
self.level_name = level_name | |
self.airsim_client.simLoadLevel(self.level_name) | |
self.airsim_client.confirmConnection() # failsafe | |
time.sleep(sleep_sec) # let the environment load completely | |
def initialize_drone(self): | |
self.airsim_client.enableApiControl(vehicle_name=self.drone_name) | |
self.airsim_client.arm(vehicle_name=self.drone_name) | |
time.sleep(0.2) | |
def start_race(self): | |
self.airsim_client.simStartRace(1) | |
time.sleep(0.2) | |
class HandelClient(object): | |
def __init__(self, drone_name, airsim_client, rc_client): | |
self.drone_name = drone_name | |
self.airsim_client = airsim_client | |
self.rc_client = rc_client | |
self.CURRENT_POSE = None | |
self.z = None | |
def handel(self): | |
while True: | |
data = self.rc_client.recv(1024)[2:] | |
data = data.split(b"\x00")[0].decode() | |
self.execute_command(data) | |
def execute_command(self, data): | |
self.CURRENT_POSE = self.airsim_client.simGetVehiclePose(self.drone_name) # Returns Pose object for the specified vehicle | |
command = data.split("@")[0] | |
# Fixed Height Flight Mode | |
if command == "lmv": | |
try: | |
roll, pitch, throttle, yaw = self.rc_inputs(data) | |
print("Throttle: {} Yaw: {} Pitch: {} Roll: {}".format(throttle, yaw, pitch, roll)) | |
yawmode_object = airsim.YawMode(is_rate= True, yaw_or_rate=0.0) | |
# Setting vehicle's liner velocity in x and y to be zero | |
# Setting its height to last known z value | |
if pitch == 0.0 and roll == 0.0: | |
# Stop the vehicle on zero roll and pitch | |
self.airsim_client.moveByVelocityZAsync(vx=0, vy=0, z=self.z, duration=DURATION, drivetrain=0, yaw_mode=yawmode_object, vehicle_name=self.drone_name) | |
# Move the vehicle at fixed height | |
self.airsim_client.moveByRollPitchYawrateZAsync(roll, pitch, yaw, self.z, DURATION, vehicle_name = self.drone_name) | |
except: | |
print("Retrying...") | |
# Manual Flight Mode | |
if command == "mv": | |
try: | |
self.z = self.CURRENT_POSE.position.z_val | |
roll, pitch, throttle, yaw = self.rc_inputs(data) | |
if throttle == 0.0: | |
throttle = HOVER_THROTTLE | |
print("Throttle: {} Yaw: {} Pitch: {} Roll: {}".format(throttle, yaw, pitch, roll)) | |
self.airsim_client.moveByRollPitchYawrateZAsync(roll, pitch, yaw, throttle, DURATION, vehicle_name = self.drone_name) | |
except: | |
print("Retrying...") | |
def rc_inputs(self, data): | |
''' | |
Function to send rc inputs | |
''' | |
try: | |
throttle, yaw, pitch, roll = map(lambda x: float(x), data.split("@")[1:]) | |
# Limiting RC inputs | |
throttle = np.clip(throttle, -VAL_T, VAL_T*2) | |
yaw = np.clip(yaw, -VAL_Y, VAL_Y) | |
roll = np.clip(roll, -VAL_RP, VAL_RP) | |
pitch = np.clip(pitch, -VAL_RP, VAL_RP) | |
# Converting raw RC inputs | |
throttle, yaw, pitch, roll = self.convert_rc_inputs(throttle, yaw, pitch, roll) | |
return roll, pitch, throttle, yaw | |
except ValueError: | |
print("Possibly Data Loss..") | |
def convert_rc_inputs(self, throttle, yaw, pitch, roll): | |
''' | |
Function to convert the raw RC values into AirSim's throt, pitch, yaw, roll | |
''' | |
if INVERT_T: | |
throttle = -(throttle/SENSITIVITY_T) | |
else: | |
throttle = (throttle/SENSITIVITY_T) | |
if INVERT_Y: | |
yaw = -(yaw/SENSITIVITY_Y) | |
else: | |
yaw = (yaw/SENSITIVITY_Y) | |
if INVERT_P: | |
pitch = -(pitch/SENSITIVITY_P) | |
else: | |
pitch = (pitch/SENSITIVITY_P) | |
if INVERT_R: | |
roll = -(roll/SENSITIVITY_R) | |
else: | |
roll = (roll/SENSITIVITY_R) | |
throttle = np.clip(throttle, -HIGH_THROTTLE, (VAL_T*2)/SENSITIVITY_T) | |
return throttle, yaw, pitch, roll | |
class RCReciever(object): | |
def __init__(self, ip = "127.0.0.1", port = 2222, drone_name = "drone_1", airsim_client = None): | |
self.ip = ip | |
self.port = port | |
self.airsim_client = airsim_client | |
self.drone_name = drone_name | |
# socket for rc connection | |
self.server_socket = socket.socket() | |
def start(self): | |
try: | |
self.server_socket.bind((self.ip, self.port)) | |
except Exception: | |
print("Port is busy..") | |
return | |
print(f"[+] Socket successfully created at {self.ip}, {self.port}") | |
print("[+] Now connect your Remote Controller app") | |
self.server_socket.listen(5) | |
# accept rc connection | |
self.rc_client, addr = self.server_socket.accept() | |
print(f"[+] Got connection from {addr}") | |
handler = HandelClient(self.drone_name, self.airsim_client, self.rc_client) | |
try: | |
handler.handel() | |
except KeyboardInterrupt: | |
print("[-] KeyboardInterrupt rc_client disconnected") | |
self.rc_client.close() | |
self.server_socket.close() | |
if __name__ == "__main__": | |
world = World(DRONE_NAME) | |
world.load_level(ARENA) | |
world.initialize_drone() | |
world.start_race() | |
reciever = RCReciever( | |
ip=HOST, | |
port=PORT, | |
drone_name=DRONE_NAME, | |
airsim_client=world.airsim_client | |
) | |
reciever.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment