Last active
March 1, 2024 05:37
-
-
Save bilalbayasut/6fe0745e4f879aa474d494a90e1b9c34 to your computer and use it in GitHub Desktop.
pyav multiprocess to get sum of pixel values
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import time | |
import multiprocessing | |
import concurrent.futures | |
import cv2 | |
import logging | |
import sys | |
import av | |
import numpy as np | |
def get_video_info(video_path): | |
if video_path is None: | |
return None | |
container = av.open(video_path) | |
stream = container.streams.video[0] | |
frames = stream.frames | |
duration = int(stream.duration * stream.time_base) | |
start_time = stream.start_time | |
result = { | |
'frames': frames, | |
'start_time': start_time, | |
'duration': duration | |
} | |
container.close() | |
return result | |
def ms_to_s(duration): | |
return duration / 1000 | |
def s_to_ms(duration): | |
return duration * 1000 | |
def s_to_pts(second, stream): | |
return int(second / stream.time_base) | |
def pts_to_s(pts, stream): | |
return pts * stream.time_base | |
def elapsed_time(end, start): | |
return round(end - start, 1) | |
def get_video_division(video_path, start=0, end=0, workers=1): | |
if workers == 1: | |
workers = 4 | |
video_info = get_video_info(video_path) | |
start_time = video_info['start_time'] | |
duration = video_info['duration'] | |
if start < start_time: | |
start = start_time | |
if end > duration: | |
end = duration | |
if end <= 0 or end <= start: | |
end = duration | |
diff_duration = end - start | |
part = diff_duration / workers | |
results = [(i * part + start, (i + 1) * part + start) for i in range(workers)] | |
return results | |
def get_pixel_values(video_path, start, end): | |
worker_pid = multiprocessing.current_process().pid | |
logging.info(f"starting pid: {worker_pid} | {start} - {end}") | |
container = av.open(video_path) | |
stream = container.streams.video[0] | |
if start > 0.0: | |
offset = s_to_pts(start, stream) | |
container.seek(offset=int(offset), stream=stream) | |
else: | |
start = stream.start_time | |
frame_index = start | |
sum_pixel_values = 0 | |
counted_frame = 0 | |
red = 0 | |
for frame in container.decode(video=0): | |
if frame.time >= end: | |
break | |
img = frame.to_rgb().to_ndarray() | |
red = img.item(10,10,2) | |
sum_pixel_values += red | |
counted_frame += 1 | |
frame_index += 1 | |
# logging.info( | |
# f"pid: {worker_pid} | {start} - {end} | frame_index: {frame.index} | frame_time:{frame.time} | red:{red}" | |
# ) | |
container.close() | |
logging.info( | |
f"pid: {worker_pid} | {start} - {end} | counted_frame: {counted_frame} | pixel: {sum_pixel_values}" | |
) | |
return sum_pixel_values | |
@click.command() | |
@click.argument("video_path", type=str, nargs=-1, required=True) | |
@click.option( | |
"--workers", type=int, default=1, help=f"number of cpu(s) used", | |
) | |
@click.option( | |
"--start", type=int, default=1, help=f"start time in second(s)", | |
) | |
@click.option( | |
"--end", type=int, default=1, help=f"end time in second(s)", | |
) | |
def calculate_pixel_values(video_path, workers, start, end): | |
video_path = video_path[0] | |
if video_path is None: | |
logging.error("no video found, make sure path is correct") | |
sys.exit(0) | |
logging.debug(f"running on {workers} worker(s)") | |
video_info = get_video_info(video_path) | |
logging.info(video_info) | |
parts = get_video_division(video_path, start, end, workers) | |
logging.info(f"parts: {parts}") | |
total_pixel_value = 0 | |
if workers == 1: | |
t1_start = time.time() | |
for part in parts: | |
total_pixel_value += get_pixel_values(video_path, part[0], part[1]) | |
t1_stop = time.time() | |
else: | |
with concurrent.futures.ProcessPoolExecutor() as executor: | |
t1_start = time.time() | |
futures = [ | |
executor.submit(get_pixel_values, video_path, part[0], part[1]) | |
for part in parts | |
] | |
for i, f in enumerate(concurrent.futures.as_completed(futures), start=1): | |
total_pixel_value += f.result() | |
# logging.info(f"part {i} elapsed: {elapsed_time(time.time(), t1_start)}") | |
t1_stop = time.time() | |
logging.info( | |
f"total_pixel_value: {total_pixel_value} Elapsed time in seconds: {elapsed_time(t1_stop, t1_start)}" | |
) | |
if __name__ == "__main__": | |
logging.basicConfig( | |
# filename='./logs/read_video.log', filemode='a', | |
format="%(levelname)s: %(asctime)s | %(message)s", | |
datefmt="%I:%M:%S %p", | |
level=logging.DEBUG, | |
) | |
calculate_pixel_values() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment