Last active
June 9, 2020 15:34
-
-
Save bilalbayasut/3ec2e69ac88a5ac6b8bf00570b7f09e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import av | |
import tqdm | |
import logging | |
import signal | |
import sys | |
import subprocess | |
import typing as T | |
import multiprocessing | |
def get_frame_from_video_url(url, start=0, width=None, height=None): | |
""" | |
Uses ffmpeg to get a jpeg frame for a video | |
:param url: url to video | |
:param start: offset in video in seconds | |
:width: width of frame | |
:height: height of frame | |
""" | |
start_time = float(start) | |
start_time = max(0, start_time) # start at 0 if negataive | |
start_time = round(start_time, 5) # ffmpeg doesn't want many decimals | |
command = [ | |
"ffmpeg", | |
"-ss", | |
str(start_time), | |
# "-vsync", | |
# "1", | |
# "-noaccurate_seek", | |
"-i", | |
url, | |
"-frames", | |
"1", | |
"-q:v", | |
"2", | |
] | |
if width or height: | |
width = width or -1 | |
height = height or -1 | |
command.extend(["-vf", f"scale={width}:{height}"]) | |
command.extend(["-f", "image2", "-"]) | |
out = subprocess.check_output(command) | |
return out | |
def read_video_frames( | |
video_path: str, start: float = 0, end: float = None | |
) -> T.Iterator[av.VideoFrame]: | |
""" | |
Yields frames from a video | |
:param video_path: path or url of a video | |
:param start: time to start from (in seconds) | |
:param end: time to end (in seconds) | |
""" | |
container = av.open(video_path) | |
if start: | |
start_ms = int(start * 1e6) | |
frames = container.seek(start_ms) | |
frames = container.decode(video=0) | |
for frame in frames: | |
if frame.time < start: | |
continue | |
if end is not None and frame.time > end: | |
break | |
if not frame: | |
break | |
yield frame | |
container.close() | |
def split_into_chunks(start: float, end: float, chunks: int): | |
""" | |
Split `start` to `end` into `chunks` amount of equally sized chunks | |
>>> split_into_chunks(10, 30, 4) | |
[(10, 15), (15, 20), (20, 25), (25, 30)] | |
""" | |
total = end - start | |
segment_size = total / chunks | |
result = [] | |
for i in range(chunks): | |
segment_start = start + i * segment_size | |
segment_end = segment_start + segment_size | |
result.append((segment_start, segment_end)) | |
return result | |
class VideoProcessor(object): | |
def __init__( | |
self, | |
video_path: str, | |
start_time: float, | |
end_time: float, | |
process_function: T.Callable[[av.VideoFrame], None], | |
result_function: T.Callable = None, | |
show_progress: bool = False, | |
return_result: bool = False, | |
n_workers: int = multiprocessing.cpu_count(), | |
logger=None, | |
): | |
""" | |
:param video_path: path to video | |
:param function: function to run on each frame | |
:param start_time: time at which to start the processing | |
:param end_time: time at which to start the processing | |
:param out_queue: queue where to put the results | |
""" | |
super().__init__() | |
self.id = id | |
self.video_path = video_path | |
self.process_function = process_function | |
self.result_function = result_function | |
self.start_time = start_time | |
self.end_time = end_time | |
self.queue = multiprocessing.Queue() | |
self.show_progress = show_progress | |
self.return_result = return_result | |
self.n_workers = n_workers | |
self.running = False | |
self.stopping = False | |
self.logger = logger or logging.getLogger(__name__) | |
self.status_dict = multiprocessing.Manager().dict() | |
# Register the signal to the handler | |
try: | |
signal.signal(signal.SIGINT, self.handle_sigint) | |
signal.signal(signal.SIGTERM, self.handle_sigterm) | |
except Exception as error: | |
print(f"error {error}") | |
container = av.open(video_path) | |
container_duration_s = container.duration / av.time_base | |
if self.start_time < 0: | |
self.start_time = 0 | |
if self.end_time is None or self.end_time > container_duration_s: | |
self.end_time = container_duration_s + 1 | |
duration_s = self.end_time - self.start_time | |
self.estimated_frames = int( | |
container.streams.video[0].average_rate * duration_s | |
) | |
self.section_offsets = split_into_chunks( | |
self.start_time, self.end_time, self.n_workers | |
) | |
self.logger.debug(f"number of workers: {n_workers}") | |
self.workers = [] | |
self.pipe_list = [] | |
for i, (start, end) in enumerate(self.section_offsets): | |
recv_end, send_end = None, None | |
if return_result: | |
recv_end, send_end = multiprocessing.Pipe(False) | |
self.workers.append( | |
self._make_worker(start, end, send_end, self.status_dict) | |
) | |
self.pipe_list.append(recv_end) | |
self.logger.debug(f"created worker {i} to work ({start} - {end})") | |
def handle_sigint(self, sig, frame): | |
print("Got 1st SIGINT (control-c), shutting down gracefully") | |
if self.stopping: | |
print("Got 2nd SIGINT (control-c), shutting down forcefully") | |
sys.exit(1) | |
self.stop() | |
def handle_sigterm(self, sig, frame): | |
print("Got 1st SIGTERM, shutting down gracefully") | |
if self.stopping: | |
print("Got 2nd SIGTERM, shutting down forcefully") | |
sys.exit(1) | |
self.stop() | |
def stop(self): | |
self.running = False | |
self.stopping = True | |
def _make_worker(self, start, end, send_end, status_dict): | |
worker = VideoProcessorWorker( | |
id=len(self.workers) + 1, | |
video_path=self.video_path, | |
start_time=start, | |
end_time=end, | |
function=self.process_function, | |
out_queue=self.queue, | |
send_end=send_end, | |
logger=self.logger, | |
status_dict=self.status_dict, | |
) | |
return worker | |
def start(self): | |
try: | |
self.running = True | |
def consumer(queue): | |
if self.show_progress: | |
progress = tqdm.tqdm(total=self.estimated_frames) | |
try: | |
for item in iter(queue.get, None): | |
if self.result_function: | |
self.result_function(item) | |
if self.show_progress: | |
progress.update() | |
except BrokenPipeError: | |
for worker in self.workers: | |
worker.terminate() | |
for worker in self.workers: | |
worker.daemon = True | |
worker.start() | |
with multiprocessing.Manager() as manager: | |
d = manager.dict() | |
consumer_process = multiprocessing.Process( | |
target=consumer, args=(self.queue,) | |
) | |
consumer_process.daemon = True | |
consumer_process.start() | |
for worker in self.workers: | |
worker.join() | |
self.logger.debug(f"joined worker") | |
self.queue.put(None) | |
consumer_process.join() | |
return ( | |
d.get("status", "") != "error", | |
[x.recv() for x in self.pipe_list], | |
) | |
except Exception as error: | |
raise Exception(error) | |
class VideoProcessorWorker(multiprocessing.Process): | |
""" A class for processing video """ | |
def __init__( | |
self, | |
id: int, | |
video_path: str, | |
function: T.Callable[[av.VideoFrame], None], | |
start_time: float, | |
end_time: float, | |
out_queue: multiprocessing.Queue, | |
send_end: multiprocessing.Pipe, | |
logger=None, | |
status_dict=None, | |
): | |
""" | |
:param video_path: path to video | |
:param function: function to run on each frame | |
:param start_time: time at which to start the processing | |
:param end_time: time at which to start the processing | |
:param out_queue: queue where to put the results | |
""" | |
super().__init__() | |
self.id = id | |
self.video_path = video_path | |
self.function = function | |
self.start_time = start_time | |
self.end_time = end_time | |
self.out_queue = out_queue | |
self.send_end = send_end | |
self.logger = logger or logging.getLogger(f"worker-{self.id}") | |
self.status_dict = status_dict | |
def run(self): | |
try: | |
self.logger.debug( | |
f"processing video start={self.start_time:.5f} end={self.end_time:.5f}" | |
) | |
print( | |
f"processing video start={self.start_time:.5f} end={self.end_time:.5f}" | |
) | |
n_frames = 0 | |
first_frame = last_frame = None | |
results_list = [] | |
for frame in read_video_frames( | |
self.video_path, self.start_time, self.end_time | |
): | |
if first_frame is None: | |
first_frame = frame | |
n_frames += 1 | |
result = self.function(frame) | |
self.out_queue.put(result) | |
if self.send_end: | |
results_list.append(result) | |
last_frame = frame | |
if self.send_end: | |
self.send_end.send(results_list) | |
self.logger.debug( | |
f"completed frames={n_frames}" | |
f" return {len(results_list)} apriltag detections" | |
) | |
print(f"completed frames={n_frames}") | |
except Exception: | |
self.status_dict["status"] = "error" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment