Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jaycosaur/e50d78aeb11e95c8667efc82e31346dc to your computer and use it in GitHub Desktop.
Save jaycosaur/e50d78aeb11e95c8667efc82e31346dc to your computer and use it in GitHub Desktop.
import cv2
import time
import numpy as np
import multiprocessing as mp
log_const = 30
shared_frame = mp.RawArray('B', 2764800)
camera_dims = (720, 1280, 3)
class SimpleSharedMemory:
def __init__(self, shared_memory, size, lock: mp.Lock):
self.shared_memory = np.frombuffer(
shared_memory, dtype=np.uint8).reshape(size)
self.lock = lock
def write(self, frame):
with self.lock:
np.copyto(self.shared_memory, frame)
def read(self):
with self.lock:
local = self.shared_memory.copy()
return local
class FPS:
def __init__(self, length: int, title: str):
self.length = length
self.counter = 0
self.start = time.time_ns()
self.title = title
def tick(self):
self.counter += 1
if (self.counter % self.length == 0):
print(self.title, round(self.counter/round((time.time_ns() -
self.start)/1e9)), "FPS", round((time.time_ns() -
self.start)/(self.counter*1e3)), 'us')
def scale_frame(frame, wanted_frame_width: int):
'''Scales frame to wanted width if smaller than frame or returns original'''
raw_frame_height, raw_frame_width, _ = frame.shape
if wanted_frame_width >= raw_frame_width:
return frame
scale_factor = wanted_frame_width / raw_frame_width
width = int(raw_frame_width * scale_factor)
height = int(raw_frame_height * scale_factor)
dim = (width, height)
frame_scaled = cv2.resize(
frame, dim, interpolation=cv2.INTER_AREA)
return frame_scaled
def cpu_bound():
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
cpu_bound_ticker = FPS(5, 'CPU Bound')
def inner(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
face_cascade.detectMultiScale(gray, 1.01, 5)
cpu_bound_ticker.tick()
return inner
def camera_producer():
camera_ticker = FPS(log_const, 'Camera')
cam = cv2.VideoCapture(0)
while True:
ok, frame = cam.read()
if ok:
camera_ticker.tick()
yield frame
display_ticker = FPS(log_const, 'Display')
def display(image_frame):
cv2.imshow('display', scale_frame(image_frame, 200))
cv2.waitKey(1)
display_ticker.tick()
def display_thread(input_queue: mp.Queue, shared_memory, shared_memory_lock):
shared_memory_helper = SimpleSharedMemory(
shared_memory, camera_dims, shared_memory_lock)
while True:
input_queue.get()
display(shared_memory_helper.read())
def camera_thread(display_queue: mp.Queue, cpu_bound_queue: mp.Queue, shared_memory, shared_memory_lock):
camera = camera_producer()
shared_memory_helper = SimpleSharedMemory(
shared_memory, camera_dims, shared_memory_lock)
for frame in camera:
shared_memory_helper.write(frame)
display_queue.put(1)
try:
cpu_bound_queue.put(1, False)
except mp.queues.Full:
pass
def cpu_bound_thread(input_queue: mp.Queue, shared_memory, shared_memory_lock):
cpu_bound_op = cpu_bound()
shared_memory_helper = SimpleSharedMemory(
shared_memory, camera_dims, shared_memory_lock)
while True:
input_queue.get()
cpu_bound_op(shared_memory_helper.read())
def main():
ctx = mp.get_context('spawn')
display_input_queue = ctx.Queue(1)
cpu_bound_input_queue = ctx.Queue(1)
shared_memory_lock = ctx.Lock()
PROCESSES = [
ctx.Process(name="camera_thread", target=camera_thread, args=(
display_input_queue, cpu_bound_input_queue, shared_frame, shared_memory_lock)),
ctx.Process(name="cpu_bound_thread", target=cpu_bound_thread, args=(
cpu_bound_input_queue, shared_frame, shared_memory_lock)),
]
for process in PROCESSES:
process.start()
display_thread(display_input_queue, shared_frame, shared_memory_lock)
for process in PROCESSES:
process.join()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment