Skip to content

Instantly share code, notes, and snippets.

@neilyoung
Last active August 11, 2022 07:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save neilyoung/5a0e2a16ecb684179e81f747f4af605e to your computer and use it in GitHub Desktop.
Save neilyoung/5a0e2a16ecb684179e81f747f4af605e to your computer and use it in GitHub Desktop.
Parrot core flight functions
import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, Landing, PCMD
from olympe.messages import gimbal
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged, GpsLocationChanged
from olympe.messages import gimbal, precise_home
from olympe.enums.precise_home import mode
from olympe.messages.ardrone3.PilotingSettings import MaxTilt
from olympe.messages.ardrone3.SpeedSettings import MaxVerticalSpeed, MaxPitchRollRotationSpeed, MaxRotationSpeed
from olympe.messages.ardrone3.PilotingSettingsState import MaxTiltChanged
from olympe.messages.ardrone3.SpeedSettingsState import MaxVerticalSpeedChanged, MaxPitchRollRotationSpeedChanged, MaxRotationSpeedChanged
import argparse
import textwrap
import sys
from kbhit import KBHit
from threading import Timer
__version__ = "1.0"
# Source unknown, found on the web
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.function = function
self.interval = interval
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def __run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self.__run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
class Test:
def __init__(self, args):
self.args = args
self.kb = KBHit()
self.drone = olympe.Drone(self.args.anafi_drone_ip)
self.stat_timer = RepeatedTimer(1.0, self.report_status)
# Main loop
def loop(self):
if self.drone.connect():
# Set max values according to possible max values
tilt = self.get_state_safe(MaxTiltChanged)
self.drone(MaxTilt(tilt['max'])) if tilt is not None else self.logger.error("can't set tilt")
verticalSpeed = self.get_state_safe(MaxVerticalSpeedChanged)
self.drone(MaxVerticalSpeed(verticalSpeed['max'])) if verticalSpeed is not None else self.logger.error("cant' set vertical speed")
rotationSpeed = self.get_state_safe(MaxRotationSpeedChanged)
self.drone(MaxRotationSpeed(rotationSpeed['max'])) if rotationSpeed is not None else self.logger.error("can't set rotation speed")
pitchrollRotationSpeed = self.get_state_safe(MaxPitchRollRotationSpeedChanged)
self.drone(MaxPitchRollRotationSpeed(pitchrollRotationSpeed['max'])) if pitchrollRotationSpeed is not None else self.logger.error("can't set pitch roll rotation speed")
self.drone(gimbal.set_max_speed(gimbal_id=0, yaw=0.0, pitch = self.args.speed_percent, roll=0.0))
while True:
if self.kb.kbhit():
c = self.kb.getch()
if c == 'q':
# QUIT
self.close()
elif c == 'w':
# FORWARD
self.drone(PCMD(1, 0, self.args.speed_percent, 0, 0, 0))
elif c == 's':
# BACKWARD
self.drone(PCMD(1, 0, -self.args.speed_percent, 0, 0, 0))
elif c == 'u':
# UP
self.drone(PCMD(1, 0, 0, 0, self.args.speed_percent, 0))
elif c == 'i':
# DOWN
self.drone(PCMD(1, 0, 0, 0, -self.args.speed_percent, 0))
elif c == 'a':
# LEFT
self.drone(PCMD(1, -self.args.speed_percent, 0, 0, 0, 0))
elif c == 'd':
# RIGHT
self.drone(PCMD(1, self.args.speed_percent, 0, 0, 0, 0))
elif c == 'y' or c == 'z': # American/German keyboard
# ROTATE LEFT
self.drone(PCMD(1, 0, 0, -self.args.speed_percent, 0, 0))
elif c == 'x':
# ROTATE RIGHT
self.drone(PCMD(1, 0, 0, self.args.speed_percent, 0, 0))
elif c == 'l':
# LAND
self.land()
elif c == 't':
# TAKEOFF
self.takeoff()
elif c == '-':
# GIMBAL DOWN
self.gimbal_pitch_rel(-1)
elif c == '+':
# GIMBAL UP
self.gimbal_pitch_rel(1)
def report_status(self):
position = self.get_state_safe(GpsLocationChanged)
flying = self.get_state_safe(FlyingStateChanged)
#gimbal_attitude = self.drone(gimbal.gimbal_capabilities(gimbal_id = 0, model=model.main, axes=olympe.enums.gimbal.axis.pitch)).wait().success()
altitude = round(position['altitude'], 2) if position is not None else '??'
latitude = round(position['latitude'], 6) if position is not None else '??'
longitude = round(position['longitude'], 6) if position is not None else '??'
state = ["Landed", "TakingOff", "Hovering", "Flying", "Landing", "Emergency", "UserTakeOff", "MonitorRamping", "EmergencyLanding"][flying["state"].value] if flying is not None else '??'
print(f"state: {state}, lat: {latitude}, lng: {longitude}, alt: {altitude}")
def get_state_safe(self, state):
''' Survive possible exception (e.g. when value is not available yet). Ask before fetch '''
if not self.drone.connection_state():
return None
if self.drone.check_state(state):
return self.drone.get_state(state)
return None
def land(self):
''' Land the drone '''
if not self.drone.connection_state():
return
self.drone(Landing())
def takeoff(self):
''' Takeoff. TODO: Wait for GPS fix?'''
if not self.drone.connection_state():
return
self.drone(precise_home.set_mode(mode.standard))
self.drone(TakeOff())
def gimbal_pitch_rel(self, ratio, timeout=5):
''' Change camera pitch relatively by ratio of tilt-speed between -1 and 1 '''
if not self.drone.connection_state():
return
self.drone(gimbal.set_target(
gimbal_id=0,
control_mode="velocity",
yaw_frame_of_reference="none",
yaw=0.0,
pitch_frame_of_reference="absolute",
pitch=ratio,
roll_frame_of_reference="none",
roll=0.0,
_timeout=timeout
))
def close(self):
if self.stat_timer:
self.stat_timer.stop()
self.drone.disconnect()
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog='test.py',
formatter_class=lambda prog: argparse.RawDescriptionHelpFormatter(
prog, max_help_position=60, width=160
),
description=textwrap.dedent('''\
version:
v.{}
description:
A Python3 test script for Parrot Olympe problems
Note: Need to be run in parrot venv. Run `source ~/code/parrot-groundsdk/products/olympe/linux/env/shell` before.
'''.format(__version__)),
epilog=textwrap.dedent('''\
copyright: (c) 2022 NY
'''))
parser._action_groups.pop()
required = parser.add_argument_group('required arguments')
optional = parser.add_argument_group('optional arguments')
# Parrot mandatory args
required.add_argument("--anafi-drone-ip", type=str, default='192.168.188.118', help="The IP of the drone (default: '192.168.188.118', the simulator here)")
# Parrot optional args
optional.add_argument("--speed-percent", type=int, default=20, choices=range(0,100), help="Parrot speed percentage for all speeds (default: 20)")
optional.add_argument("--olympe-log-level", type=str, choices=["critical", "error", "warning", "info", "debug"], default="warning", help="Olympe SDK log level (default: 'warning')")
args = parser.parse_args()
olympe.log.update_config({"loggers": {"olympe": {"level": args.olympe_log_level.upper()}}})
test = Test(args)
test.loop()
@neilyoung
Copy link
Author

Content of required "kbhit.py" (from the web):

#!/usr/bin/env python
'''
A Python class implementing KBHIT, the standard keyboard-interrupt poller.
Works transparently on Windows and Posix (Linux, Mac OS X).  Doesn't work
with IDLE.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as 
published by the Free Software Foundation, either version 3 of the 
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.
'''


# Found here: https://gist.github.com/michelbl/efda48b19d3e587685e3441a74457024

import os

# Windows
if os.name == 'nt':
    import msvcrt

# Posix (Linux, OS X)
else:
    import sys
    import termios
    import atexit
    from select import select


class KBHit:

    def __init__(self):
        '''Creates a KBHit object that you can call to do various keyboard things.
        '''

        if os.name == 'nt':
            pass

        else:

            # Save the terminal settings
            self.fd = sys.stdin.fileno()
            self.new_term = termios.tcgetattr(self.fd)
            self.old_term = termios.tcgetattr(self.fd)

            # New terminal setting unbuffered
            self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO)
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)

            # Support normal-terminal reset at exit
            atexit.register(self.set_normal_term)


    def set_normal_term(self):
        ''' Resets to normal terminal.  On Windows this is a no-op.
        '''

        if os.name == 'nt':
            pass

        else:
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)


    def getch(self):
        ''' Returns a keyboard character after kbhit() has been called.
            Should not be called in the same program as getarrow().
        '''

        s = ''

        if os.name == 'nt':
            return msvcrt.getch().decode('utf-8')

        else:
            return sys.stdin.read(1)


    def getarrow(self):
        ''' Returns an arrow-key code after kbhit() has been called. Codes are
        0 : up
        1 : right
        2 : down
        3 : left
        Should not be called in the same program as getch().
        '''

        if os.name == 'nt':
            msvcrt.getch() # skip 0xE0
            c = msvcrt.getch()
            vals = [72, 77, 80, 75]

        else:
            c = sys.stdin.read(3)[2]
            vals = [65, 67, 66, 68]

        return vals.index(ord(c.decode('utf-8')))


    def kbhit(self):
        ''' Returns True if keyboard character was hit, False otherwise.
        '''
        if os.name == 'nt':
            return msvcrt.kbhit()

        else:
            dr,dw,de = select([sys.stdin], [], [], 0)
            return dr != []


# Test    
if __name__ == "__main__":

    kb = KBHit()

    print('Hit any key, or ESC to exit')

    while True:

        if kb.kbhit():
            c = kb.getch()
            c_ord = ord(c)
            print(c)
            print(c_ord)
            if c_ord == 27: # ESC
                break
            print(c)

    kb.set_normal_term()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment