Skip to content

Instantly share code, notes, and snippets.

@lucasmediaflow

lucasmediaflow/camera.py

Last active Jul 20, 2021
Embed
What would you like to do?
#!/usr/bin/python3 -u
from rekognize import rekognize
# from dmx import Flash, time
import time
from datetime import datetime
import json
import math
import threading
from pathlib import Path
import depthai as dai
import cv2
import numpy as np
# from gpiozero import MotionSensor
import threading
import sys
# mSensor = MotionSensor(4)
# flash = Flash()
# flash.off()
try:
with open('unit.conf', 'r') as file:
unit = json.load(file)
shutterSpeed = unit['photo.shutter_speed']
flashDelay = unit['flashDelay']
iso = unit['photo.iso']
focus = unit['focus']
# mSensorDelay = unit['mSensorDelay']
except Exception as e:
flashDelay = 0.3
shutterSpeed = 2000
iso = 1600
focus = 135
# mSensorDelay = 5
# flash.triggered = mSensorDelay
data = {'shutterSpeed': shutterSpeed, 'iso': iso, 'flashDelay': flashDelay}
try:
pipeline = dai.Pipeline()
except Exception as error:
exit('bye bye')
# camera
camera = pipeline.createColorCamera()
camera.initialControl.setManualFocus(focus)
camera.initialControl.setAutoFocusMode(dai.RawCameraControl.AutoFocusMode.OFF)
# resolution 4056, 3040
camera.setResolution(dai.ColorCameraProperties.SensorResolution.THE_12_MP)
camera.setInterleaved(False)
camera.setPreviewKeepAspectRatio(False)
# neural network
neuralNetwork = pipeline.createNeuralNetwork()
neuralNetwork.setBlobPath(str(Path('./mobilenet-ssd.blob')))
nnXLinkOut = pipeline.createXLinkOut()
nnXLinkOut.setStreamName('nnXLinkOut')
# preview
# videoXLinkOut = pipeline.createXLinkOut()
# videoXLinkOut.setStreamName('videoXLinkOut')
# Scale 4K frames to 300x300 for NN
manip = pipeline.createImageManip()
manip.initialConfig.setResize(300, 300)
camera.video.link(manip.inputImage)
# jpeg encoder
jpegEncoder = pipeline.createVideoEncoder()
jpegEncoder.setDefaultProfilePreset(
camera.getStillSize(), 1, dai.VideoEncoderProperties.Profile.MJPEG)
jpegXLinkOut = pipeline.createXLinkOut()
jpegXLinkOut.setStreamName('jpegXLinkOut')
# camera control
controlXLinkIn = pipeline.createXLinkIn()
controlXLinkIn.setStreamName('controlXLinkIn')
# link nodes together
manip.out.link(neuralNetwork.input)
camera.still.link(jpegEncoder.input)
controlXLinkIn.out.link(camera.inputControl)
jpegEncoder.bitstream.link(jpegXLinkOut.input)
neuralNetwork.out.link(nnXLinkOut.input)
def frame_norm(frame, boundaryBox):
return (np.array(boundaryBox) * np.array([*frame.shape[:2], *frame.shape[:2]])[::-1]).astype(int)
def clamp(num, v0, v1): return max(v0, min(num, v1))
# def triggerCam(sensor):
# if sensor.is_active:
# if time.time() - flash.triggered >= mSensorDelay:
# flash.on()
# time.sleep(flashDelay)
# cameraControl.setCaptureStill(True)
# controllerQueue.send(cameraControl)
# else:
# print('LAST TRIGGER LESS THAN', mSensorDelay, 'SEC AGO, SKIPPING')
# else:
# print('MOTION SENSOR DEACTIVATED, READY TO READ AGAIN')
# with dai.Device(pipeline, usb2Mode = True) as dev:
with dai.Device(pipeline) as dev:
# get data queues
controllerQueue = dev.getInputQueue('controlXLinkIn')
jpegQueue = dev.getOutputQueue('jpegXLinkOut')
# videoQueue = dev.getOutputQueue('videoXLinkOut')
neuralNetworkQueue = dev.getOutputQueue('nnXLinkOut')
# create camera controller
cameraControl = dai.CameraControl()
# set manual exposure, for now can't be done via initialControl like focus
cameraControl.setManualExposure(shutterSpeed, iso)
cameraControl.setAutoWhiteBalanceMode( dai.RawCameraControl.AutoWhiteBalanceMode.INCANDESCENT)
controllerQueue.send(cameraControl)
# start pipeline
try:
dev.startPipeline()
except Exception as error:
print('COULDNT START DEPTHAI PIPELINE, EXITING', error)
# motion sensor
# mSensor.when_motion = triggerCam
# mSensor.when_no_motion = triggerCam
bBoxes = []
start_time = time.time()
counter = 0
fps = 0
color = (255, 255, 255)
while True:
# get frames from queues
try:
# videoFrames = videoQueue.tryGetAll()
neuralNetworkFrames = neuralNetworkQueue.tryGet()
jpegFrames = jpegQueue.tryGetAll()
except Exception as error:
# flash.off()
exit('bye bye')
if neuralNetworkFrames is not None:
counter += 1
current_time = time.time()
if (current_time - start_time) > 1:
fps = counter / (current_time - start_time)
counter = 0
start_time = current_time
bBoxes = np.array(neuralNetworkFrames.getFirstLayerFp16())
bBoxes = bBoxes[:np.where(bBoxes == -1)[0][0]]
bBoxes = bBoxes.reshape((bBoxes.size // 7, 7))
confidence = bBoxes[bBoxes[:, 2] > 0.8][:, 2:3]
bBoxes = bBoxes[bBoxes[:, 2] > 0.8][:, 3:7]
if jpegFrames is not None:
for jpegFrame in jpegFrames:
# flash.off()
frame = cv2.imdecode(jpegFrame.getData(), cv2.IMREAD_UNCHANGED)
boxy = frame_norm(frame, bBoxes)
timeName = datetime.now().strftime('%d%H%M%S')
data['detectedBox'] = boxy.tolist()
try:
data['confidence'] = int(confidence[0][0]*100)
left = int(boxy[0][0])
right = int(boxy[0][2])
top = int(boxy[0][1])
bottom = int(boxy[0][3])
data['firstBox'] = [left, right, top, bottom]
# find left and right with multiple people
for box in boxy:
if int(box[0]) < left: left = int(box[0])
if int(box[2]) > right: right = int(box[2])
if int(box[1]) > top: top = int(box[1])
if int(box[3]) < bottom: bottom = int(box[3])
data['everyoneBox'] = [left, right, top, bottom]
width = right - left
height = bottom - top
mid_width = math.floor((left + right) / 2)
mid_height = math.floor((top + bottom) / 2)
if (width > height): # stretch up if landscape frame
height_extended = math.floor((3 * width) / 4)
halfHeight = math.floor(height_extended / 2)
top = int(mid_height - halfHeight)
bottom = int(mid_height + halfHeight)
if top <= 0:
top = 0
bottom = 0 + height_extended
if bottom >= 3040:
top = 3040 - height_extended
bottom = 3040
data['height_extended'] = height = height_extended
else: # stretch sides if portrait frame
width_extended = math.floor((4 * height) / 3)
halfWidth = math.floor(width_extended / 2)
left = int(mid_width - halfWidth)
right = int(mid_width + halfWidth)
if left <= 0:
left = 0
right = 0 + width_extended
if right >= 4032:
left = 4032 - width_extended
right = 4032
data['width_extended'] = width = width_extended
data['mid_width'] = mid_width
data['mid_height'] = mid_height
data['width'] = width
data['height'] = height
data['finalBox'] = {
'left': left, 'right': right, 'top': top, 'bottom': bottom}
cropped = frame[top:bottom, left:right]
cv2.imwrite('output/' + timeName + '_' + str(width) + 'x' + str(height) + '.jpg', cropped)
with open('output/' + timeName + '_' + str(width) + 'x' + str(height) + '.json', 'w') as file:
file.write(json.dumps(data))
# full size is 4032x3040
except Exception as e:
with open('output/' + timeName + '_full.json', 'w') as file:
file.write(json.dumps(data))
cv2.imwrite('output/' + timeName + '_full.jpg', frame)
imageBytes = cv2.imencode('.jpg', frame)[1].tobytes()
reko = rekognize(imageBytes)
left = math.floor(reko['Instances'][0]
['BoundingBox']['Left'] * 4032)
top = math.floor(reko['Instances'][0]['BoundingBox']['Top'] * 3040)
right = math.floor(
left + (reko['Instances'][0]['BoundingBox']['Width'] * 4032))
bottom = math.floor(
top + (reko['Instances'][0]['BoundingBox']['Height'] * 3040))
width = math.floor(reko['Instances'][0]
['BoundingBox']['Width'] * 4032)
height = math.floor(reko['Instances'][0]
['BoundingBox']['Height'] * 3040)
crop = frame[top:bottom, left:right]
cv2.imwrite('output/' + timeName + '_rekofull_' +
str(width) + 'x' + str(height) + '.jpg', crop)
try:
if (width > height): # stretch up if landscape frame
height_extended = math.floor((3 * width) / 4)
halfHeight = math.floor(height_extended / 2)
mid_height = math.floor(height / 2)
top = int(mid_height - halfHeight)
bottom = int(mid_height + halfHeight)
data['mid_height'] = mid_height
if top <= 0:
top = 0
bottom = 0 + height_extended
if bottom >= 3040:
top = 3040 - height_extended
bottom = 3040
data['height_extended'] = height = height_extended
else: # stretch sides if portrait frame
width_extended = math.floor((4 * height) / 3)
halfWidth = math.floor(width_extended / 2)
mid_width = math.floor(width / 2)
left = int(mid_width - halfWidth)
right = int(mid_width + halfWidth)
data['mid_width'] = mid_width
if left <= 0:
left = 0
right = 0 + width_extended
if right >= 4032:
left = 4032 - width_extended
right = 4032
data['width_extended'] = width = width_extended
data['width'] = width
data['height'] = height
data['adjustedCrop'] = {
'left': left, 'right': right, 'top': top, 'bottom': bottom}
crop = frame[top:bottom, left:right]
cv2.imwrite('output/' + timeName + '_reko_' +
str(width) + 'x' + str(height) + '.jpg', crop)
with open('output/' + timeName + '_reko_' + str(width) + 'x' + str(height) + '.json', 'w') as file:
file.write(json.dumps(data))
except Exception as error:
print('FAILED STRETCHING REKO', error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment