Last active
June 20, 2023 17:12
-
-
Save Paulooh007/d112db36e7691625b5ac9dea8e8239cc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import socket | |
import exceptions | |
import math | |
import argparse | |
from dronekit import connect, VehicleMode, LocationGlobalRelative, APIException | |
from pymavlink import mavutil | |
import cv2 | |
import cv2.aruco as aruco | |
import numpy as np | |
import imutils | |
def connect_vehicle(): | |
pass | |
##OpenCV aruco | |
aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_ARUCO_ORIGINAL) | |
parameters = aruco.DetectorParameters_create() | |
ids_to_find = [129, 72] | |
marker_sizes = [40, 19] # cm | |
marker_heights = [7, 4] | |
takeoff_height = 8 | |
velocity = 0.5 | |
## | |
##Camera | |
horizontal_res = 640 | |
vertical_res = 480 | |
cap = VideoCapture() | |
w, h = 1000, 680 | |
land = False | |
def get_distance_meters(targetLocation, currentLocation): | |
dLat = targetLocation.lat - currentLocation.lat | |
dLon = targetLocation.lon - currentLocation.lon | |
return math.sqrt((dLon * dLon) + (dLat * dLat)) * 1.113195e5 | |
def send_local_ned_velocity(vx, vy, vz): | |
msg = vehicle.message_factory.set_position_target_local_ned_encode( | |
0, | |
0, | |
0, | |
mavutil.mavlink.MAV_FRAME_BODY_OFFSET_NED, | |
0b0000111111000111, | |
0, | |
0, | |
0, | |
vx, | |
vy, | |
vz, | |
0, | |
0, | |
0, | |
0, | |
0, | |
) | |
vehicle.send_mavlink(msg) | |
vehicle.flush() | |
def send_land_message(x, y): | |
msg = vehicle.message_factory.landing_target_encode( | |
0, | |
0, | |
mavutil.mavlink.MAV_FRAME_BODY_OFFSET_NED, | |
x, | |
y, | |
0, | |
0, | |
0, | |
) | |
vehicle.send_mavlink(msg) | |
vehicle.flush() | |
def goto(targetLocation): | |
distanceToTargetLocation = get_distance_meters( | |
targetLocation, vehicle.location.global_relative_frame | |
) | |
vehicle.simple_goto(targetLocation) | |
while vehicle.mode.name == "GUIDED": | |
currentDistance = get_distance_meters( | |
targetLocation, vehicle.location.global_relative_frame | |
) | |
if currentDistance < distanceToTargetLocation * 0.02: | |
print("Reached target waypoint.") | |
time.sleep(2) | |
break | |
time.sleep(1) | |
return None | |
def arm(vehicle): | |
print("*-- Arming*") | |
vehicle.mav.command_long_send( | |
vehicle.target_system, | |
vehicle.target_component, | |
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, | |
0, | |
1, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
) | |
ack(vehicle, "COMMAND_ACK") | |
def set_flight_mode(): | |
pass | |
def takeoff(vehicle, height): | |
print("*-- Takeoff Initiated*") | |
vehicle.mav.command_long_send( | |
vehicle.target_system, | |
vehicle.target_component, | |
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, | |
0, | |
0, | |
0, | |
0, | |
math.nan, | |
0, | |
0, | |
height, | |
) | |
ack(vehicle, "COMMAND_ACK") | |
def distance(pointA, pointB): | |
"""Get Euclidean distance between 2 points""" | |
return (((pointA[0] - pointB[0]) ** 2) + ((pointA[1] - pointB[1]) ** 2)) ** 0.5 | |
def drawMarker(corners, ids): | |
"""Draw marker bounding box and return the center coordinates, height and width""" | |
for markerCorner, markerID in zip(corners, ids): | |
corners = markerCorner.reshape((4, 2)) | |
(topLeft, topRight, bottomRight, bottomLeft) = corners | |
# convert each of the (x, y)-coordinate pairs to integers | |
topRight = (int(topRight[0]), int(topRight[1])) | |
bottomRight = (int(bottomRight[0]), int(bottomRight[1])) | |
bottomLeft = (int(bottomLeft[0]), int(bottomLeft[1])) | |
topLeft = (int(topLeft[0]), int(topLeft[1])) | |
markerWidth = int(distance(topLeft, topRight)) | |
markerHeight = int(distance(topLeft, bottomLeft)) | |
# draw the bounding box of the ArUCo detection | |
cv2.line(frame, topLeft, topRight, (0, 255, 0), 2) | |
cv2.line(frame, topRight, bottomRight, (0, 255, 0), 2) | |
cv2.line(frame, bottomRight, bottomLeft, (0, 255, 0), 2) | |
cv2.line(frame, bottomLeft, topLeft, (0, 255, 0), 2) | |
# compute and draw the center (x, y)-coordinates of the | |
# ArUco marker | |
cX = int((topLeft[0] + bottomRight[0]) / 2.0) | |
cY = int((topLeft[1] + bottomRight[1]) / 2.0) | |
cv2.circle(frame, (cX, cY), 4, (0, 0, 255), -1) | |
# draw the ArUco marker ID on the frame | |
cv2.putText( | |
frame, | |
str(markerID), | |
(topLeft[0], topLeft[1] - 15), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
0.5, | |
(0, 255, 0), | |
2, | |
) | |
return [(cX, cY), (markerWidth, markerHeight)] # [ midpoint, w and h] | |
def drawQuadrant(): | |
"""Divide frame into 4 regions/quadrants""" | |
cv2.line(frame, (centerX, 0), (centerX, h), (0, 0, 255), 1) | |
cv2.line(frame, (0, centerY), (w, centerY), (0, 0, 255), 1) | |
cv2.circle(frame, (centerX, centerY), 4, (0, 255, 255), -1) | |
def getQuadrantInfo(cX, cY, centerX, centerY): | |
# """Return region/quadrant and ΔX and ΔY""" | |
delX = abs(cX - centerX) | |
delY = abs(cY - centerY) | |
region = 0 | |
if cX <= centerX and cY <= centerY: | |
region = 1 | |
elif cX >= centerX and cY <= centerY: | |
region = 2 | |
elif cX >= centerX and cY >= centerY: | |
region = 3 | |
elif cX <= centerX and cY >= centerY: | |
region = 4 | |
return (region, (delX, delY)) | |
def getDirection(speedX, speedY, region): | |
""" | |
+ve speedX = Right | |
-ve speedX = Left | |
+ve speedY = front | |
-ve speedY = Back | |
""" | |
if region == 1: | |
return (-speedX, speedY) | |
elif region == 2: | |
return (speedX, speedY) | |
elif region == 3: | |
return (speedX, -speedY) | |
elif region == 4: | |
return (-speedX, -speedY) | |
def getKeyboardIinput(): | |
lr, fb, ud, yv = 0, 0, 0, 0 | |
speed = 50 | |
global land | |
global isLanding | |
if kp.getKey("LEFT"): | |
lr = -speed | |
elif kp.getKey("RIGHT"): | |
lr = speed | |
if kp.getKey("UP"): | |
fb = speed | |
elif kp.getKey("DOWN"): | |
fb = -speed | |
if kp.getKey("w"): | |
ud = speed | |
elif kp.getKey("s"): | |
ud = -speed | |
if kp.getKey("a"): | |
yv = speed | |
elif kp.getKey("d"): | |
yv = -speed | |
if kp.getKey("z"): | |
me.land() | |
triggerALRS("close") | |
# me.streamoff() | |
if kp.getKey("l"): | |
land, isLanding = not land, False | |
sleep(0.2) | |
if kp.getKey("t"): | |
me.takeoff() | |
if kp.getKey("c"): | |
triggerALRS("close") | |
if kp.getKey("o"): | |
triggerALRS("open") | |
return [lr, fb, ud, yv] | |
def triggerALRS(str): | |
with serial.Serial("COM5", 9600, timeout=1) as ser: | |
time.sleep(2) | |
ser.write(f"{str}".encode("utf-8")) | |
def exitRoutine(): | |
# me.land() | |
print("Killing all process") | |
triggerALRS("close") | |
def aruco_land(): | |
while True: | |
vals = getKeyboardIinput() | |
cap = cv2.VideoCapture(1) #streaming vid | |
frame, _ = cap.read() | |
frame = cv2.resize(frame, (w, h)) | |
cv2.putText( | |
frame, | |
f"Altitude: {h} isLanding: {isLanding} Land: {land} Mode: {'Landing'}", | |
(20, 20), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
0.5, | |
(0, 255, 255), | |
1, | |
) | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
if land: | |
centerX, centerY = w // 2, h // 2 # Get center of Frame | |
drawQuadrant() | |
# Find aruCo marker in frame | |
aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_100) | |
parameters = aruco.DetectorParameters_create() | |
corners, ids, rejectedImgPoints = aruco.detectMarkers( | |
gray, aruco_dict, parameters=parameters | |
) | |
# print(corners) | |
if len(corners) > 0: # if All 4 corners of marker is detected | |
ids = ids.flatten() | |
markerInfo = drawMarker(corners, ids) | |
cX, cY = markerInfo[0] # Get marker midpoint | |
# Calculate error i.e Distance btw aruco midpoint and frame midpoint | |
error = int(distance((cX, cY), (centerX, centerY))) | |
markerWidth, markerHeight = markerInfo[1] | |
rW, rH = round(markerWidth / w, 1), round( | |
markerHeight / h, 1 | |
) # Calculate ratio | |
# rW, rH = 0.4, 0.4 | |
# Draw line between center of frame and center of marker | |
cv2.line(frame, (cX, cY), (centerX, centerY), (254, 255, 0), 3) | |
region, delXandY = getQuadrantInfo( | |
cX, cY, centerX, centerY | |
) # Get the quadrant/region where marker is in and ΔX and ΔY | |
delX, delY = delXandY | |
delX, delY = int(rW * delX), int(rH * delY) # Multiple error with ratio | |
speedX, speedY = int(np.clip(delX, -100, 100)), int( | |
np.clip(delY, -100, 100) | |
) # front/back and left/right command to send to drone. | |
speedX, speedY = ( | |
getDirection(speedX, speedY, region) if not None else (0, 0) | |
) # Add direction to command | |
cv2.putText( | |
frame, | |
f"X: {speedX} Y: {speedY} Error: {error} Alt: {height} isLanding: {isLanding}", | |
(20, 50), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
1, | |
(0, 255, 0), | |
2, | |
) | |
if not isLanding: | |
send_local_ned_velocity(speedX, speedY, 0, 0) | |
if height < 60: # Close enought to the ground | |
send_land_message() | |
triggerALRS("close") | |
# print(me.get_battery()) | |
if error < 25 or isLanding: | |
if not isOpen: | |
isOpen = True | |
triggerALRS("open") | |
if not isLanding: | |
isLanding = True | |
send_local_ned_velocity(speedX, speedY, 0, 0) | |
# me.send_rc_control(vals[0], vals[1], vals[2], vals[3]) | |
else: | |
print("No marker.........................") | |
# isLanding = False | |
send_local_ned_velocity(0, 0, 0, 0) # Stay put | |
# me.send_rc_control(vals[0], vals[1], vals[2], vals[3]) | |
else: | |
# me.send_rc_control(vals[0], vals[1], vals[2], vals[3]) | |
cv2.imshow("video", frame) | |
if cv2.waitKey(1) & 0xFF == ord("q"): | |
land() | |
triggerALRS("close") | |
print(me.get_battery()) | |
break | |
if __name__ == "__main__": | |
vehicle = connect_vehicle() | |
lat_home = vehicle.location.global_relative_frame.lat | |
lon_home = vehicle.location.global_relative_frame.lon | |
arm() | |
vehicle.motors_armed_wait() | |
set_flight_mode(vehicle, "GUIDED") | |
takeoff(takeoff_height) | |
send_local_ned_velocity(velocity, velocity, 0) ##Offset drone from target | |
ready_to_land = 1 | |
if ready_to_land == 1: | |
while vehicle.armed == True: | |
aruco_land() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment