Skip to content

Instantly share code, notes, and snippets.

@shrimo
Last active August 1, 2023 18:30
Show Gist options
  • Save shrimo/fbf9ccb1b940790939c8fd5696de0142 to your computer and use it in GitHub Desktop.
Save shrimo/fbf9ccb1b940790939c8fd5696de0142 to your computer and use it in GitHub Desktop.
Dividing the frame into tiles
#!/usr/bin/python3
# Dividing the frame into tiles
import multiprocessing
import numpy as np
import cv2
class OpenCV_Multiprocessing:
def __init__(self, features):
self.font = cv2.FONT_HERSHEY_SIMPLEX
self.detect = cv2.ORB_create(features)
self.number_cpu = cv2.getNumberOfCPUs()
self.input_q = multiprocessing.Queue(maxsize=self.number_cpu)
self.output_q = multiprocessing.Queue(maxsize=self.number_cpu)
self.pool = multiprocessing.Pool(self.number_cpu, self.worker, (self.input_q, self.output_q))
def worker(self, input_q, output_q):
while True:
output_q.put(self.processing_ORB(input_q.get()))
def processing_ORB(self, data_frame):
kp1, des1 = self.detect.detectAndCompute(data_frame[0], None)
points = cv2.KeyPoint_convert(kp1)
for xy in points:
cv2.drawMarker(data_frame[0], np.int32(xy), data_frame[2], 1, 10, 1, 8)
cv2.putText(data_frame[0], 'Multiprocessing', (30, 30), self.font , 0.5, (255, 255, 255), 1)
cv2.putText(data_frame[0], str(data_frame[1]), (30, 50), self.font , 0.5, (255, 255, 255), 1)
return data_frame
def __del__(self):
print('Multiprocessing stop')
self.pool.terminate()
if __name__ == '__main__':
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(multiprocessing.SUBDEBUG)
number_cpu = cv2.getNumberOfCPUs()
mov = '/home/mo/GitHub/Video/traffic_01.mp4'
cap = cv2.VideoCapture(mov)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
W = width//int(number_cpu*0.5)
H = height//int(number_cpu*0.5)
print('width:', width, 'height:', height)
print('Number CPU:', number_cpu)
cv2.namedWindow('frame', cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
# cv2.namedWindow('tile', cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
tiles = {}
ocvmp = OpenCV_Multiprocessing(1000)
rand_color = np.random.uniform(0, 255, size=(10, 3))
while cap.isOpened():
ret, frame = cap.read()
count = 0
if not ret:
break
for i, y in enumerate(range(0, height, H)):
for j, x in enumerate(range(0, width, W)):
y1 = y + H
x1 = x + W
tiles[(i, j)] = (frame[y:y+H, x:x+W].copy(), (y, y1, x, x1), rand_color[count])
cv2.rectangle(frame, (x, y), (x1, y1), (0, 255, 0))
count += 1
for key, item in tiles.items():
ocvmp.input_q.put((item[0], item[1], item[2]))
key = cv2.waitKey(1)
if key == ord('q') or key == 27:
break
elif key == ord('p'):
cv2.waitKey(-1)
for key in tiles:
data_frame = ocvmp.output_q.get()
coord = data_frame[1]
frame[coord[0]:coord[1], coord[2]:coord[3]] = data_frame[0]
cv2.imshow('frame', frame)
# cv2.imshow('tile', tiles[(0, 1)][0])
cap.release()
cv2.destroyAllWindows()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment