Skip to content

Instantly share code, notes, and snippets.

@bilalbayasut
Created April 27, 2020 11:40
Show Gist options
  • Save bilalbayasut/7aaa7e1167065fc1ced1466926ebb892 to your computer and use it in GitHub Desktop.
Save bilalbayasut/7aaa7e1167065fc1ced1466926ebb892 to your computer and use it in GitHub Desktop.
calculate_pixel_value with multi-process or single-process
import time
import multiprocessing
import concurrent.futures
import cv2
import logging
def elapsed_time(end, start):
return round(end - start, 1)
def get_frames_parts(total_frames=0, max_workers=1):
part = int(total_frames / max_workers)
results = [(i * part, (i + 1) * part) for i in range(max_workers)]
logging.info(f"frame parts: {results}")
return results
def get_pixel_values(video_path=None, start_frame=0, end_frame=0):
worker_pid = multiprocessing.current_process().pid
logging.info(f"starting pid: {worker_pid} | {start_frame} - {end_frame}")
capture = cv2.VideoCapture(video_path)
# total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) - 1
capture.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_index = start_frame
counter = 0
sum_pixel_values = 0
w_start = time.time()
while frame_index < end_frame:
# logging.debug(f"pid:{worker_pid} frame_index:{frame_index}")
success, captured_frame = capture.read()
if captured_frame is None and success is False:
break
red = captured_frame.item((10, 10, 2))
sum_pixel_values += red
time.sleep(0.01) # frame read process
counter += 1
frame_index += 1
capture.release()
# # sleep
# time.sleep(5)
# while frame_index < end_frame:
# # logging.debug(f"frame_index: {frame_index}")
# time.sleep(0.01)
# frame_index += 1
w_stop = time.time()
logging.info(
f"pid: {worker_pid} | {start_frame} - {end_frame} | counter: {counter} | pixel: {sum_pixel_values}"
)
return sum_pixel_values
def calculate_pixel_values(video_path=None, mp=False):
worker_pid = multiprocessing.current_process().pid
if mp:
logging.debug(f"running on multi process {multiprocessing.cpu_count()}")
else:
logging.debug(f"PID: {worker_pid} running on single process")
# get total frame from vid
capture = cv2.VideoCapture(video_path) # load the video
total_frame = int(
capture.get(cv2.CAP_PROP_FRAME_COUNT)
) # get its total frame count
logging.info(f"total frame from capture: {total_frame}")
capture.release() # release the capture straight away
parts = get_frames_parts(total_frame, max_workers=multiprocessing.cpu_count())
if mp:
total_counter = 0
with concurrent.futures.ProcessPoolExecutor() as executor:
t1_start = time.time()
futures = [
executor.submit(get_pixel_values, video_path, part[0], part[1])
for part in parts
]
for i, f in enumerate(concurrent.futures.as_completed(futures), start=1):
total_counter += f.result()
# logging.info(f"part {i} elapsed: {elapsed_time(time.time(), t1_start)}")
t1_stop = time.time()
else:
total_counter = 0
t1_start = time.time()
for part in parts:
total_counter += get_pixel_values(video_path, part[0], part[1])
t1_stop = time.time()
logging.info(
f"total_counter: {total_counter} Elapsed time in seconds: {elapsed_time(t1_stop, t1_start)}"
)
if __name__ == "__main__":
logging.basicConfig(
# filename='../logs/new_attempt_main.log', filemode='w',
format="%(levelname)s: %(asctime)s | %(message)s",
datefmt="%I:%M:%S %p",
level=logging.DEBUG,
)
# video_path = "../videos/test_video.mp4"
video_path = "../videos/test_270_mb.mp4"
calculate_pixel_values(video_path, mp=False)
calculate_pixel_values(video_path, mp=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment