Skip to content

Instantly share code, notes, and snippets.

@elbruno
Created March 16, 2020 02:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elbruno/33a51222ff1e091db7ccab15017e5aa7 to your computer and use it in GitHub Desktop.
Save elbruno/33a51222ff1e091db7ccab15017e5aa7 to your computer and use it in GitHub Desktop.
djitellolandonbananadetected.py
# Bruno Capuano
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades
# enable drone video camera
# display video camera using OpenCV
# display FPS
# detect faces and bananas
# launch the drone with key T, and land with key L
## if the drone is flying, and a banana is detected, land the drone
import cv2
import socket
import time
import threading
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) - timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# -----------------------------------------------
# Main program
# -----------------------------------------------
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# enable face and smile detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
banana_cascade = cv2.CascadeClassifier('banana_classifier.xml')
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
banana_detected = False
drone_flying = False
i = 0
while True:
i = i + 1
start_time = time.time()
try:
_, frameOrig = cap.read()
frame = cv2.resize(frameOrig, (480, 360))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# detect faces
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 0, 255), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'face', (h + 6, w - 6), font, 0.7, (255, 255, 255), 1)
# detect banana
bananas = banana_cascade.detectMultiScale(gray,
scaleFactor=1.3,
minNeighbors=5,
minSize=(150, 50))
for (x, y, w, h) in bananas:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 255, 0), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'bananas', (h + 6, w - 6), font, 0.7, (255, 255, 255), 1)
if(len(bananas) > 0):
banana_detected = True
else:
banana_detected = False
# fly logic
if (drone_flying == True and banana_detected == True):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
break
# display fps
if (time.time() - start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno - DJI Tello Camera', frame)
sendReadCommand('battery?')
print(f'banana: {banana_detected} - flying: {drone_flying} - battery: {battery} % - i: {i} - {fpsInfo}')
except Exception as e:
print(f'exc: {e}')
pass
#raise e
if cv2.waitKey(1) & 0xFF == ord('t'):
drone_flying = True
detection_started = True
msg = "takeoff"
sendCommand(msg)
if cv2.waitKey(1) & 0xFF == ord('l'):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
msg = "land"
sendCommand(msg) # land
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment