Created
May 20, 2020 18:51
-
-
Save elbruno/af5485797e6d57110d645deb334e3e3d to your computer and use it in GitHub Desktop.
DjiDroneCamInstanceSegmentation.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Bruno Capuano | |
# enable drone video camera | |
# display video camera using OpenCV | |
# display FPS | |
# add a bottom image overlay, using a background image | |
# key D enable / disable instance segmentation detection | |
# save a local video with the camera recorded | |
import pixellib | |
from pixellib.instance import instance_segmentation | |
import socket | |
import time | |
import threading | |
import os | |
import cv2 | |
def receiveData(): | |
global response | |
while True: | |
try: | |
response, _ = clientSocket.recvfrom(1024) | |
except: | |
break | |
def readStates(): | |
global battery | |
while True: | |
try: | |
response_state, _ = stateSocket.recvfrom(256) | |
if response_state != 'ok': | |
response_state = response_state.decode('ASCII') | |
list = response_state.replace(';', ':').split(':') | |
battery = int(list[21]) | |
except: | |
break | |
def sendCommand(command): | |
global response | |
timestamp = int(time.time() * 1000) | |
clientSocket.sendto(command.encode('utf-8'), address) | |
while response is None: | |
if (time.time() * 1000) - timestamp > 5 * 1000: | |
return False | |
return response | |
def sendReadCommand(command): | |
response = sendCommand(command) | |
try: | |
response = str(response) | |
except: | |
pass | |
return response | |
def sendControlCommand(command): | |
response = None | |
for i in range(0, 5): | |
response = sendCommand(command) | |
if response == 'OK' or response == 'ok': | |
return True | |
return False | |
# ----------------------------------------------- | |
# Main program | |
# ----------------------------------------------- | |
# connection info | |
UDP_IP = '192.168.10.1' | |
UDP_PORT = 8889 | |
last_received_command = time.time() | |
STATE_UDP_PORT = 8890 | |
address = (UDP_IP, UDP_PORT) | |
response = None | |
response_state = None | |
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
clientSocket.bind(('', UDP_PORT)) | |
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
stateSocket.bind(('', STATE_UDP_PORT)) | |
# start threads | |
recThread = threading.Thread(target=receiveData) | |
recThread.daemon = True | |
recThread.start() | |
stateThread = threading.Thread(target=readStates) | |
stateThread.daemon = True | |
stateThread.start() | |
# connect to drone | |
response = sendControlCommand("command") | |
print(f'command response: {response}') | |
response = sendControlCommand("streamon") | |
print(f'streamon response: {response}') | |
# drone information | |
battery = 0 | |
# open UDP | |
print(f'opening UDP video feed, wait 2 seconds ') | |
videoUDP = 'udp://192.168.10.1:11111' | |
cap = cv2.VideoCapture(videoUDP) | |
time.sleep(2) | |
# open video writer to save video | |
vid_cod = cv2.VideoWriter_fourcc(*'XVID') | |
vid_output = cv2.VideoWriter("cam_video.mp4", vid_cod, 20.0, (640,480)) | |
dsize = (640, 480) | |
# load bottom img | |
background = cv2.imread('Bottom03.png') | |
background = cv2.resize(background, dsize) | |
# load model | |
instance_seg = instance_segmentation() | |
instance_seg.load_model("mask_rcnn_coco.h5") | |
# main app | |
detectionEnabled = False | |
i = 0 | |
while True: | |
i = i + 1 | |
start_time = time.time() | |
sendReadCommand('battery?') | |
print(f'battery: {battery} % - i: {i}') | |
try: | |
ret, frame = cap.read() | |
img = cv2.resize(frame, (640, 480)) | |
if (detectionEnabled): | |
# save image to disk and open it | |
imgNumber = str(i).zfill(5) | |
frameImageFileName = str(f'tmp\image{imgNumber}.png') | |
outputImageName = str(f'tmp\image{imgNumber}Out.png') | |
if os.path.exists(frameImageFileName): | |
os.remove(frameImageFileName) | |
cv2.imwrite(frameImageFileName, img) | |
segmask, img = instance_seg.segmentFrame(img, show_bboxes= True) | |
cv2.imwrite(outputImageName, img) | |
# overlay background | |
img = cv2.addWeighted(background, 1, img, 1, 0) | |
if (time.time() - start_time ) > 0: | |
fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop | |
font = cv2.FONT_HERSHEY_DUPLEX | |
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1) | |
cv2.imshow('@elbruno - DJI Tello Camera', img) | |
vid_output.write(img) | |
except Exception as e: | |
print(f'exc: {e}') | |
pass | |
# key controller | |
key = cv2.waitKey(1) & 0xFF | |
if key == ord("d"): | |
if (detectionEnabled == True): | |
detectionEnabled = False | |
else: | |
detectionEnabled = True | |
if key == ord("q"): | |
break | |
# release resources | |
response = sendControlCommand("streamoff") | |
print(f'streamon response: {response}') | |
# close the already opened camera, and the video file | |
cap.release() | |
vid_output.release() | |
cv2.destroyAllWindows() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment