Skip to content

Instantly share code, notes, and snippets.

@hshimamoto
Last active October 27, 2020 23:20
Show Gist options
  • Save hshimamoto/6d1a607c1025b7f1d9a4e3a241faa909 to your computer and use it in GitHub Desktop.
Save hshimamoto/6d1a607c1025b7f1d9a4e3a241faa909 to your computer and use it in GitHub Desktop.
Python OpenCV Multiple process
#!/usr/bin/env python3
# vim: set sw=2 sts=2 expandtab:
# ref https://qiita.com/kakinaguru_zo/items/eda129635816ad871e9d
import cv2
import sys
import time
import numpy
import socket
import multiprocessing
import multiprocessing.sharedctypes
if len(sys.argv) == 1:
print("cam opts...")
sys.exit()
cam = sys.argv[1]
sock = None
if len(sys.argv) > 2:
sock = sys.argv[2]
socks = []
for path in sys.argv[2:]:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(path)
socks.append(sock)
except:
print("connect failed: " + path)
def capture_process(src, buf, size, req, ready, quit):
cap = cv2.VideoCapture(src)
fps = cap.get(cv2.CAP_PROP_FPS)
delay = 1 / fps
while not quit.is_set():
try:
cap_start = time.time()
ret, frame = cap.read()
if ret is False:
raise IOError
if req.is_set():
ready.clear()
frame = cv2.resize(frame, size)
#frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
memoryview(buf).cast('B')[:] = memoryview(frame).cast('B')[:]
ready.set()
req.clear()
while True:
elasped = time.time() - cap_start
if elasped >= delay:
break
time.sleep(delay - elasped)
except KeyboardInterrupt:
break
print("done")
cap.release()
if __name__ == "__main__":
width = 640
height = 360
buf = multiprocessing.sharedctypes.RawArray('B', width * height * 3)
req = multiprocessing.Event()
ready = multiprocessing.Event()
quit = multiprocessing.Event()
cap_proc = multiprocessing.Process(target=capture_process, args=(cam, buf, (width, height), req, ready, quit))
cap_proc.start()
image = numpy.empty((height, width, 3), dtype=numpy.uint8)
lasttime = time.time()
while True:
try:
# request to copy
req.set()
# and wait
ready.wait()
image[:,:,:] = numpy.reshape(buf, (height, width, 3))
curr = time.time()
# show fps
elapsed = curr - lasttime
s = "{:.3f}FPS / {:.3f}ms".format(1.0/elapsed, elapsed*1000)
cv2.putText(image, s,
(16, 16), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA)
lasttime = curr
cv2.imshow("test", image)
for s in socks:
s.send(buf)
if cv2.waitKey(1) & 0xff == 27:
break
for s in socks:
r = s.recv(1)
if len(r) == 0:
socks.remove(s)
except KeyboardInterrupt:
break
quit.set()
cv2.destroyWindow("test")
cap_proc.join(1)
#!/usr/bin/env python3
# vim: set sw=2 sts=2 expandtab:
# ref. https://towardsdatascience.com/face-detection-in-2-minutes-using-opencv-python-90f89d7c0f81
# curl -vLO https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml
import cv2
import sys
import os
import numpy
import socket
import time
if len(sys.argv) == 1:
print("face sock")
sys.exit()
class Server:
def __init__(self, path):
self._path = path
def start(self):
s = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(self._path)
s.listen(1)
print("start listening")
try:
while True:
conn, addr = s.accept()
self.handle(conn)
finally:
os.remove(self._path)
def handle(self, conn):
# face detection
face_cascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml")
width = 640
height = 360
image = numpy.empty((height, width, 3), dtype=numpy.uint8)
lasttime = time.time()
while True:
data = b""
sz = width * height * 3
while len(data) < sz:
r = conn.recv(sz - len(data))
if len(r) == 0:
raise IOError
data += r
image[:,:,:] = numpy.reshape(bytearray(data), (height, width, 3), order='C')
# gray for face detection
scale = 2
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gray = cv2.resize(gray, (width//scale, height//scale))
faces = face_cascade.detectMultiScale(gray, 1.1, 4)
for (x, y, w, h) in faces:
x *= scale
y *= scale
w *= scale
h *= scale
print(x, y, w, h)
image = cv2.rectangle(image, (x, y), (x+w, y+h), (255, 0, 0), 2)
curr = time.time()
conn.send(b" ")
# show fps
elapsed = curr - lasttime
s = "{:.3f}FPS / {:.3f}ms".format(1.0/elapsed, elapsed*1000)
cv2.putText(image, s,
(16, 16), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA)
lasttime = curr
cv2.imshow("face", image)
cv2.waitKey(1)
if __name__ == "__main__":
serv = Server(sys.argv[1])
serv.start()
#!/usr/bin/env python3
# vim: set sw=2 sts=2 expandtab:
# https://storage.googleapis.com/download.tensorflow.org/models/tflite/coco_ssd_mobilenet_v1_1.0_quant_2018_06_29.zip
import cv2
import sys
import os
import numpy
import socket
import time
import tensorflow as tf
if len(sys.argv) == 1:
print("obj sock")
sys.exit()
class Server:
def __init__(self, path):
self._path = path
# load labels
with open("labelmap.txt") as f:
f.readline() # skip first
self._labels = [l.strip() for l in f.readlines()]
# load and create interpreter
self._interp = tf.lite.Interpreter(model_path="detect.tflite")
self._input_details = self._interp.get_input_details()
self._output_details = self._interp.get_output_details()
self._interp.allocate_tensors()
self._fmodel = (self._input_details[0]['dtype'] == numpy.float32)
self._in_h = self._input_details[0]['shape'][1]
self._in_w = self._input_details[0]['shape'][2]
print(self._in_h, self._in_w)
def start(self):
s = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(self._path)
s.listen(1)
print("start listening")
try:
while True:
conn, addr = s.accept()
self.handle(conn)
finally:
os.remove(self._path)
def handle(self, conn):
width = 640
height = 360
image = numpy.empty((height, width, 3), dtype=numpy.uint8)
lasttime = time.time()
while True:
data = b""
sz = width * height * 3
while len(data) < sz:
r = conn.recv(sz - len(data))
if len(r) == 0:
raise IOError
data += r
image[:,:,:] = numpy.reshape(bytearray(data), (height, width, 3), order='C')
# resize to 512x512
xs = width / self._in_w
ys = height / self._in_h
resized = cv2.resize(image, (self._in_w, self._in_h))
input_data = numpy.expand_dims(resized, axis=0)
if self._fmodel:
input_data = (numpy.float32(input_data) - 127.5) / 127.5
self._interp.set_tensor(self._input_details[0]['index'], input_data)
self._interp.invoke()
boxes = self._interp.get_tensor(self._output_details[0]['index'])[0]
classes = self._interp.get_tensor(self._output_details[1]['index'])[0]
scores = self._interp.get_tensor(self._output_details[2]['index'])[0]
for i in range(len(scores)):
score = scores[i]
if score < 0.7:
continue
label = self._labels[int(classes[i])]
y0 = int(max(1, boxes[i][0] * height))
x0 = int(max(1, boxes[i][1] * width))
y1 = int(min(height, boxes[i][2] * height))
x1 = int(min(width, boxes[i][3] * width))
print("label {} {} ({}, {})-({}, {})".format(label, scores[i], x0, y0, x1, y1))
cv2.rectangle(image, (x0, y0), (x1, y1), (255, 0, 0), 2)
curr = time.time()
conn.send(b" ")
# show fps
elapsed = curr - lasttime
s = "{:.3f}FPS / {:.3f}ms".format(1.0/elapsed, elapsed*1000)
cv2.putText(image, s,
(16, 16), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA)
lasttime = curr
cv2.imshow("obj", image)
cv2.waitKey(1)
if __name__ == "__main__":
serv = Server(sys.argv[1])
serv.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment