Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
DjiDroneCamInstanceSegmentation.py
# Bruno Capuano
# enable drone video camera
# display video camera using OpenCV
# display FPS
# add a bottom image overlay, using a background image
# key D enable / disable instance segmentation detection
# save a local video with the camera recorded
import pixellib
from pixellib.instance import instance_segmentation
import socket
import time
import threading
import os
import cv2
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) - timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# -----------------------------------------------
# Main program
# -----------------------------------------------
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open video writer to save video
vid_cod = cv2.VideoWriter_fourcc(*'XVID')
vid_output = cv2.VideoWriter("cam_video.mp4", vid_cod, 20.0, (640,480))
dsize = (640, 480)
# load bottom img
background = cv2.imread('Bottom03.png')
background = cv2.resize(background, dsize)
# load model
instance_seg = instance_segmentation()
instance_seg.load_model("mask_rcnn_coco.h5")
# main app
detectionEnabled = False
i = 0
while True:
i = i + 1
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % - i: {i}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, (640, 480))
if (detectionEnabled):
# save image to disk and open it
imgNumber = str(i).zfill(5)
frameImageFileName = str(f'tmp\image{imgNumber}.png')
outputImageName = str(f'tmp\image{imgNumber}Out.png')
if os.path.exists(frameImageFileName):
os.remove(frameImageFileName)
cv2.imwrite(frameImageFileName, img)
segmask, img = instance_seg.segmentFrame(img, show_bboxes= True)
cv2.imwrite(outputImageName, img)
# overlay background
img = cv2.addWeighted(background, 1, img, 1, 0)
if (time.time() - start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno - DJI Tello Camera', img)
vid_output.write(img)
except Exception as e:
print(f'exc: {e}')
pass
# key controller
key = cv2.waitKey(1) & 0xFF
if key == ord("d"):
if (detectionEnabled == True):
detectionEnabled = False
else:
detectionEnabled = True
if key == ord("q"):
break
# release resources
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')
# close the already opened camera, and the video file
cap.release()
vid_output.release()
cv2.destroyAllWindows()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment