-
-
Save neilyoung/5a0e2a16ecb684179e81f747f4af605e to your computer and use it in GitHub Desktop.
Parrot core flight functions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import olympe | |
from olympe.messages.ardrone3.Piloting import TakeOff, Landing, PCMD | |
from olympe.messages import gimbal | |
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged, GpsLocationChanged | |
from olympe.messages import gimbal, precise_home | |
from olympe.enums.precise_home import mode | |
from olympe.messages.ardrone3.PilotingSettings import MaxTilt | |
from olympe.messages.ardrone3.SpeedSettings import MaxVerticalSpeed, MaxPitchRollRotationSpeed, MaxRotationSpeed | |
from olympe.messages.ardrone3.PilotingSettingsState import MaxTiltChanged | |
from olympe.messages.ardrone3.SpeedSettingsState import MaxVerticalSpeedChanged, MaxPitchRollRotationSpeedChanged, MaxRotationSpeedChanged | |
import argparse | |
import textwrap | |
import sys | |
from kbhit import KBHit | |
from threading import Timer | |
__version__ = "1.0" | |
# Source unknown, found on the web | |
class RepeatedTimer(object): | |
def __init__(self, interval, function, *args, **kwargs): | |
self._timer = None | |
self.function = function | |
self.interval = interval | |
self.args = args | |
self.kwargs = kwargs | |
self.is_running = False | |
self.start() | |
def __run(self): | |
self.is_running = False | |
self.start() | |
self.function(*self.args, **self.kwargs) | |
def start(self): | |
if not self.is_running: | |
self._timer = Timer(self.interval, self.__run) | |
self._timer.start() | |
self.is_running = True | |
def stop(self): | |
self._timer.cancel() | |
self.is_running = False | |
class Test: | |
def __init__(self, args): | |
self.args = args | |
self.kb = KBHit() | |
self.drone = olympe.Drone(self.args.anafi_drone_ip) | |
self.stat_timer = RepeatedTimer(1.0, self.report_status) | |
# Main loop | |
def loop(self): | |
if self.drone.connect(): | |
# Set max values according to possible max values | |
tilt = self.get_state_safe(MaxTiltChanged) | |
self.drone(MaxTilt(tilt['max'])) if tilt is not None else self.logger.error("can't set tilt") | |
verticalSpeed = self.get_state_safe(MaxVerticalSpeedChanged) | |
self.drone(MaxVerticalSpeed(verticalSpeed['max'])) if verticalSpeed is not None else self.logger.error("cant' set vertical speed") | |
rotationSpeed = self.get_state_safe(MaxRotationSpeedChanged) | |
self.drone(MaxRotationSpeed(rotationSpeed['max'])) if rotationSpeed is not None else self.logger.error("can't set rotation speed") | |
pitchrollRotationSpeed = self.get_state_safe(MaxPitchRollRotationSpeedChanged) | |
self.drone(MaxPitchRollRotationSpeed(pitchrollRotationSpeed['max'])) if pitchrollRotationSpeed is not None else self.logger.error("can't set pitch roll rotation speed") | |
self.drone(gimbal.set_max_speed(gimbal_id=0, yaw=0.0, pitch = self.args.speed_percent, roll=0.0)) | |
while True: | |
if self.kb.kbhit(): | |
c = self.kb.getch() | |
if c == 'q': | |
# QUIT | |
self.close() | |
elif c == 'w': | |
# FORWARD | |
self.drone(PCMD(1, 0, self.args.speed_percent, 0, 0, 0)) | |
elif c == 's': | |
# BACKWARD | |
self.drone(PCMD(1, 0, -self.args.speed_percent, 0, 0, 0)) | |
elif c == 'u': | |
# UP | |
self.drone(PCMD(1, 0, 0, 0, self.args.speed_percent, 0)) | |
elif c == 'i': | |
# DOWN | |
self.drone(PCMD(1, 0, 0, 0, -self.args.speed_percent, 0)) | |
elif c == 'a': | |
# LEFT | |
self.drone(PCMD(1, -self.args.speed_percent, 0, 0, 0, 0)) | |
elif c == 'd': | |
# RIGHT | |
self.drone(PCMD(1, self.args.speed_percent, 0, 0, 0, 0)) | |
elif c == 'y' or c == 'z': # American/German keyboard | |
# ROTATE LEFT | |
self.drone(PCMD(1, 0, 0, -self.args.speed_percent, 0, 0)) | |
elif c == 'x': | |
# ROTATE RIGHT | |
self.drone(PCMD(1, 0, 0, self.args.speed_percent, 0, 0)) | |
elif c == 'l': | |
# LAND | |
self.land() | |
elif c == 't': | |
# TAKEOFF | |
self.takeoff() | |
elif c == '-': | |
# GIMBAL DOWN | |
self.gimbal_pitch_rel(-1) | |
elif c == '+': | |
# GIMBAL UP | |
self.gimbal_pitch_rel(1) | |
def report_status(self): | |
position = self.get_state_safe(GpsLocationChanged) | |
flying = self.get_state_safe(FlyingStateChanged) | |
#gimbal_attitude = self.drone(gimbal.gimbal_capabilities(gimbal_id = 0, model=model.main, axes=olympe.enums.gimbal.axis.pitch)).wait().success() | |
altitude = round(position['altitude'], 2) if position is not None else '??' | |
latitude = round(position['latitude'], 6) if position is not None else '??' | |
longitude = round(position['longitude'], 6) if position is not None else '??' | |
state = ["Landed", "TakingOff", "Hovering", "Flying", "Landing", "Emergency", "UserTakeOff", "MonitorRamping", "EmergencyLanding"][flying["state"].value] if flying is not None else '??' | |
print(f"state: {state}, lat: {latitude}, lng: {longitude}, alt: {altitude}") | |
def get_state_safe(self, state): | |
''' Survive possible exception (e.g. when value is not available yet). Ask before fetch ''' | |
if not self.drone.connection_state(): | |
return None | |
if self.drone.check_state(state): | |
return self.drone.get_state(state) | |
return None | |
def land(self): | |
''' Land the drone ''' | |
if not self.drone.connection_state(): | |
return | |
self.drone(Landing()) | |
def takeoff(self): | |
''' Takeoff. TODO: Wait for GPS fix?''' | |
if not self.drone.connection_state(): | |
return | |
self.drone(precise_home.set_mode(mode.standard)) | |
self.drone(TakeOff()) | |
def gimbal_pitch_rel(self, ratio, timeout=5): | |
''' Change camera pitch relatively by ratio of tilt-speed between -1 and 1 ''' | |
if not self.drone.connection_state(): | |
return | |
self.drone(gimbal.set_target( | |
gimbal_id=0, | |
control_mode="velocity", | |
yaw_frame_of_reference="none", | |
yaw=0.0, | |
pitch_frame_of_reference="absolute", | |
pitch=ratio, | |
roll_frame_of_reference="none", | |
roll=0.0, | |
_timeout=timeout | |
)) | |
def close(self): | |
if self.stat_timer: | |
self.stat_timer.stop() | |
self.drone.disconnect() | |
sys.exit(0) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(prog='test.py', | |
formatter_class=lambda prog: argparse.RawDescriptionHelpFormatter( | |
prog, max_help_position=60, width=160 | |
), | |
description=textwrap.dedent('''\ | |
version: | |
v.{} | |
description: | |
A Python3 test script for Parrot Olympe problems | |
Note: Need to be run in parrot venv. Run `source ~/code/parrot-groundsdk/products/olympe/linux/env/shell` before. | |
'''.format(__version__)), | |
epilog=textwrap.dedent('''\ | |
copyright: (c) 2022 NY | |
''')) | |
parser._action_groups.pop() | |
required = parser.add_argument_group('required arguments') | |
optional = parser.add_argument_group('optional arguments') | |
# Parrot mandatory args | |
required.add_argument("--anafi-drone-ip", type=str, default='192.168.188.118', help="The IP of the drone (default: '192.168.188.118', the simulator here)") | |
# Parrot optional args | |
optional.add_argument("--speed-percent", type=int, default=20, choices=range(0,100), help="Parrot speed percentage for all speeds (default: 20)") | |
optional.add_argument("--olympe-log-level", type=str, choices=["critical", "error", "warning", "info", "debug"], default="warning", help="Olympe SDK log level (default: 'warning')") | |
args = parser.parse_args() | |
olympe.log.update_config({"loggers": {"olympe": {"level": args.olympe_log_level.upper()}}}) | |
test = Test(args) | |
test.loop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Content of required "kbhit.py" (from the web):