Created
March 16, 2020 02:24
-
-
Save elbruno/33a51222ff1e091db7ccab15017e5aa7 to your computer and use it in GitHub Desktop.
djitellolandonbananadetected.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Bruno Capuano | |
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades | |
# enable drone video camera | |
# display video camera using OpenCV | |
# display FPS | |
# detect faces and bananas | |
# launch the drone with key T, and land with key L | |
## if the drone is flying, and a banana is detected, land the drone | |
import cv2 | |
import socket | |
import time | |
import threading | |
def receiveData(): | |
global response | |
while True: | |
try: | |
response, _ = clientSocket.recvfrom(1024) | |
except: | |
break | |
def readStates(): | |
global battery | |
while True: | |
try: | |
response_state, _ = stateSocket.recvfrom(256) | |
if response_state != 'ok': | |
response_state = response_state.decode('ASCII') | |
list = response_state.replace(';', ':').split(':') | |
battery = int(list[21]) | |
except: | |
break | |
def sendCommand(command): | |
global response | |
timestamp = int(time.time() * 1000) | |
clientSocket.sendto(command.encode('utf-8'), address) | |
while response is None: | |
if (time.time() * 1000) - timestamp > 5 * 1000: | |
return False | |
return response | |
def sendReadCommand(command): | |
response = sendCommand(command) | |
try: | |
response = str(response) | |
except: | |
pass | |
return response | |
def sendControlCommand(command): | |
response = None | |
for i in range(0, 5): | |
response = sendCommand(command) | |
if response == 'OK' or response == 'ok': | |
return True | |
return False | |
# ----------------------------------------------- | |
# Main program | |
# ----------------------------------------------- | |
# connection info | |
UDP_IP = '192.168.10.1' | |
UDP_PORT = 8889 | |
last_received_command = time.time() | |
STATE_UDP_PORT = 8890 | |
address = (UDP_IP, UDP_PORT) | |
response = None | |
response_state = None | |
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
clientSocket.bind(('', UDP_PORT)) | |
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
stateSocket.bind(('', STATE_UDP_PORT)) | |
# start threads | |
recThread = threading.Thread(target=receiveData) | |
recThread.daemon = True | |
recThread.start() | |
stateThread = threading.Thread(target=readStates) | |
stateThread.daemon = True | |
stateThread.start() | |
# connect to drone | |
response = sendControlCommand("command") | |
print(f'command response: {response}') | |
response = sendControlCommand("streamon") | |
print(f'streamon response: {response}') | |
# drone information | |
battery = 0 | |
# enable face and smile detection | |
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml') | |
banana_cascade = cv2.CascadeClassifier('banana_classifier.xml') | |
# open UDP | |
print(f'opening UDP video feed, wait 2 seconds ') | |
videoUDP = 'udp://192.168.10.1:11111' | |
cap = cv2.VideoCapture(videoUDP) | |
time.sleep(2) | |
# open | |
banana_detected = False | |
drone_flying = False | |
i = 0 | |
while True: | |
i = i + 1 | |
start_time = time.time() | |
try: | |
_, frameOrig = cap.read() | |
frame = cv2.resize(frameOrig, (480, 360)) | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
# detect faces | |
faces = face_cascade.detectMultiScale(gray, 1.3, 5) | |
for (x, y, w, h) in faces: | |
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 0, 255), 2) | |
font = cv2.FONT_HERSHEY_COMPLEX_SMALL | |
cv2.putText(frame, 'face', (h + 6, w - 6), font, 0.7, (255, 255, 255), 1) | |
# detect banana | |
bananas = banana_cascade.detectMultiScale(gray, | |
scaleFactor=1.3, | |
minNeighbors=5, | |
minSize=(150, 50)) | |
for (x, y, w, h) in bananas: | |
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 255, 0), 2) | |
font = cv2.FONT_HERSHEY_COMPLEX_SMALL | |
cv2.putText(frame, 'bananas', (h + 6, w - 6), font, 0.7, (255, 255, 255), 1) | |
if(len(bananas) > 0): | |
banana_detected = True | |
else: | |
banana_detected = False | |
# fly logic | |
if (drone_flying == True and banana_detected == True): | |
drone_flying = False | |
msg = "land" | |
sendCommand(msg) | |
time.sleep(5) | |
break | |
# display fps | |
if (time.time() - start_time ) > 0: | |
fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop | |
font = cv2.FONT_HERSHEY_DUPLEX | |
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1) | |
cv2.imshow('@elbruno - DJI Tello Camera', frame) | |
sendReadCommand('battery?') | |
print(f'banana: {banana_detected} - flying: {drone_flying} - battery: {battery} % - i: {i} - {fpsInfo}') | |
except Exception as e: | |
print(f'exc: {e}') | |
pass | |
#raise e | |
if cv2.waitKey(1) & 0xFF == ord('t'): | |
drone_flying = True | |
detection_started = True | |
msg = "takeoff" | |
sendCommand(msg) | |
if cv2.waitKey(1) & 0xFF == ord('l'): | |
drone_flying = False | |
msg = "land" | |
sendCommand(msg) | |
time.sleep(5) | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
msg = "land" | |
sendCommand(msg) # land | |
response = sendControlCommand("streamoff") | |
print(f'streamon response: {response}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment