Skip to content

Instantly share code, notes, and snippets.

@elbruno
Created February 24, 2022 20:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elbruno/d21d4db4002c8910dcee931acb2da03b to your computer and use it in GitHub Desktop.
Save elbruno/d21d4db4002c8910dcee931acb2da03b to your computer and use it in GitHub Desktop.
dronecontrolsquirreldetection.py
# Copyright (c) 2022
# Author : Bruno Capuano
# Create Time : 2022 Feb
# Change Log :
# - Open drone camera with openCV
# - Analyze camera frame with local custom vision project running in an app
# - Key D enable / disable object detection
# - On detection enabled
# - Save original image in tmp folder
# - Save image with bounding boxes and detected objects in det folder
# - Save json with bounding boxes and detected objects in det folder
# - Display bounding boxes and detected objects in the camera frame
# - Save a local video with the camera recorded
# - Key T for Take off, L to land; and ASDWRF to control the drone
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import socket
import time
import threading
import cv2
import json
import requests
from flask import Flask, jsonify
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
pitch = int(list[1])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) - timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# -----------------------------------------------
# Local calls
# -----------------------------------------------
probabilityThreshold = 25
def displayPredictions(jsonPrediction, frame):
global camera_Width, camera_Heigth
jsonObj = json.loads(jsonPrediction)
preds = jsonObj['predictions']
sorted_preds = sorted(preds, key=lambda x: x['probability'], reverse=True)
strSortedPreds = ""
resultFound = False
if (sorted_preds):
detected = False
for pred in sorted_preds:
# tag name and prob * 100
tagName = str(pred['tagName'])
probability = pred['probability'] * 100
# apply threshold
if (probability >= probabilityThreshold):
detected = True
bb = pred['boundingBox']
# adjust to size
height = int(bb['height'] * camera_Heigth)
left = int(bb['left'] * camera_Width)
top = int(bb['top'] * camera_Heigth)
width = int(bb['width'] * camera_Width)
# draw bounding boxes
start_point = (left, top)
end_point = (left + width, top + height)
color = (0, 0, 255)
if(tagName == "squirrel"):
color = (0, 255, 0)
thickness = 2
cv2.rectangle(img, start_point, end_point, color, thickness)
# display labels
start_point_label = (left, top - 5)
text = "{}: {:.4f}".format(tagName, probability)
cv2.putText(img, text, start_point_label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
print(f'{tagName} - {probability}')
print(f'start point: {start_point} - end point: {end_point}')
print(jsonPrediction)
if (detected == True):
detImageFileName = frameImageFileName.replace('tmp', 'det')
cv2.imwrite(detImageFileName, img)
detJsonFileName = detImageFileName.replace('png', 'json')
save_text = open(detJsonFileName, 'w')
save_text.write(jsonStr)
save_text.close()
return strSortedPreds
# instantiate flask app and push a context
app = Flask(__name__)
# -----------------------------------------------
# Main program
# -----------------------------------------------
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
pitch = 0
flyUnit = 50
# open UDP
camera_Width = 640 #1280
camera_Heigth = 480 #960
camera_Size = (camera_Width, camera_Heigth)
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open video writer to save video
vid_cod = cv2.VideoWriter_fourcc(*'XVID')
vid_output = cv2.VideoWriter("videos/dronecam_video.mp4", vid_cod, 20.0, camera_Size)
# open
drone_flying = False
detectionEnabled = False
i = 0
while True:
i = i + 1
imgNumber = str(i).zfill(5)
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % - pitch: {pitch} - i: {imgNumber}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, camera_Size)
if (detectionEnabled):
# save image to disk and open it
frameImageFileName = str(f'tmp\image{imgNumber}.png')
cv2.imwrite(frameImageFileName, img)
with open(frameImageFileName, 'rb') as f:
img_data = f.read()
# analyze file in local container
api_url = "http://127.0.0.1:80/image"
r = requests.post(api_url, data=img_data)
with app.app_context():
jsonResults = jsonify(r.json())
jsonStr = jsonResults.get_data(as_text=True)
displayPredictions(jsonStr, frame)
fpsInfo = ""
if (time.time() - start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno - DJI Tello Camera', img)
vid_output.write(img)
except Exception as e:
detectionEnabled = False
print(f'exc: {e}')
pass
# key controller
key = cv2.waitKey(1) & 0xFF
if key == ord("d"):
if (detectionEnabled == True):
detectionEnabled = False
else:
detectionEnabled = True
if cv2.waitKey(1) & 0xFF == ord('t'):
drone_flying = True
detection_started = True
msg = "takeoff"
sendCommand(msg)
if cv2.waitKey(1) & 0xFF == ord('l'):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
if (cv2.waitKey(1) & 0xFF == ord('w')) and drone_flying == True:
msg = str(f"up {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('s')) and drone_flying == True:
msg = str(f"down {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('a')) and drone_flying == True:
msg = str(f"left {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('d')) and drone_flying == True:
msg = str(f"right {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('r')) and drone_flying == True:
msg = str(f"forward {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('f')) and drone_flying == True:
msg = str(f"back {flyUnit}")
sendCommand(msg)
time.sleep(1)
if key == ord("q"):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')
# close the already opened camera, and the video file
cap.release()
vid_output.release()
cv2.destroyAllWindows()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment