Skip to content

Instantly share code, notes, and snippets.

@bilalbayasut
Last active June 9, 2020 15:34
Show Gist options
  • Save bilalbayasut/3ec2e69ac88a5ac6b8bf00570b7f09e5 to your computer and use it in GitHub Desktop.
Save bilalbayasut/3ec2e69ac88a5ac6b8bf00570b7f09e5 to your computer and use it in GitHub Desktop.
import av
import tqdm
import logging
import signal
import sys
import subprocess
import typing as T
import multiprocessing
def get_frame_from_video_url(url, start=0, width=None, height=None):
"""
Uses ffmpeg to get a jpeg frame for a video
:param url: url to video
:param start: offset in video in seconds
:width: width of frame
:height: height of frame
"""
start_time = float(start)
start_time = max(0, start_time) # start at 0 if negataive
start_time = round(start_time, 5) # ffmpeg doesn't want many decimals
command = [
"ffmpeg",
"-ss",
str(start_time),
# "-vsync",
# "1",
# "-noaccurate_seek",
"-i",
url,
"-frames",
"1",
"-q:v",
"2",
]
if width or height:
width = width or -1
height = height or -1
command.extend(["-vf", f"scale={width}:{height}"])
command.extend(["-f", "image2", "-"])
out = subprocess.check_output(command)
return out
def read_video_frames(
video_path: str, start: float = 0, end: float = None
) -> T.Iterator[av.VideoFrame]:
"""
Yields frames from a video
:param video_path: path or url of a video
:param start: time to start from (in seconds)
:param end: time to end (in seconds)
"""
container = av.open(video_path)
if start:
start_ms = int(start * 1e6)
frames = container.seek(start_ms)
frames = container.decode(video=0)
for frame in frames:
if frame.time < start:
continue
if end is not None and frame.time > end:
break
if not frame:
break
yield frame
container.close()
def split_into_chunks(start: float, end: float, chunks: int):
"""
Split `start` to `end` into `chunks` amount of equally sized chunks
>>> split_into_chunks(10, 30, 4)
[(10, 15), (15, 20), (20, 25), (25, 30)]
"""
total = end - start
segment_size = total / chunks
result = []
for i in range(chunks):
segment_start = start + i * segment_size
segment_end = segment_start + segment_size
result.append((segment_start, segment_end))
return result
class VideoProcessor(object):
def __init__(
self,
video_path: str,
start_time: float,
end_time: float,
process_function: T.Callable[[av.VideoFrame], None],
result_function: T.Callable = None,
show_progress: bool = False,
return_result: bool = False,
n_workers: int = multiprocessing.cpu_count(),
logger=None,
):
"""
:param video_path: path to video
:param function: function to run on each frame
:param start_time: time at which to start the processing
:param end_time: time at which to start the processing
:param out_queue: queue where to put the results
"""
super().__init__()
self.id = id
self.video_path = video_path
self.process_function = process_function
self.result_function = result_function
self.start_time = start_time
self.end_time = end_time
self.queue = multiprocessing.Queue()
self.show_progress = show_progress
self.return_result = return_result
self.n_workers = n_workers
self.running = False
self.stopping = False
self.logger = logger or logging.getLogger(__name__)
self.status_dict = multiprocessing.Manager().dict()
# Register the signal to the handler
try:
signal.signal(signal.SIGINT, self.handle_sigint)
signal.signal(signal.SIGTERM, self.handle_sigterm)
except Exception as error:
print(f"error {error}")
container = av.open(video_path)
container_duration_s = container.duration / av.time_base
if self.start_time < 0:
self.start_time = 0
if self.end_time is None or self.end_time > container_duration_s:
self.end_time = container_duration_s + 1
duration_s = self.end_time - self.start_time
self.estimated_frames = int(
container.streams.video[0].average_rate * duration_s
)
self.section_offsets = split_into_chunks(
self.start_time, self.end_time, self.n_workers
)
self.logger.debug(f"number of workers: {n_workers}")
self.workers = []
self.pipe_list = []
for i, (start, end) in enumerate(self.section_offsets):
recv_end, send_end = None, None
if return_result:
recv_end, send_end = multiprocessing.Pipe(False)
self.workers.append(
self._make_worker(start, end, send_end, self.status_dict)
)
self.pipe_list.append(recv_end)
self.logger.debug(f"created worker {i} to work ({start} - {end})")
def handle_sigint(self, sig, frame):
print("Got 1st SIGINT (control-c), shutting down gracefully")
if self.stopping:
print("Got 2nd SIGINT (control-c), shutting down forcefully")
sys.exit(1)
self.stop()
def handle_sigterm(self, sig, frame):
print("Got 1st SIGTERM, shutting down gracefully")
if self.stopping:
print("Got 2nd SIGTERM, shutting down forcefully")
sys.exit(1)
self.stop()
def stop(self):
self.running = False
self.stopping = True
def _make_worker(self, start, end, send_end, status_dict):
worker = VideoProcessorWorker(
id=len(self.workers) + 1,
video_path=self.video_path,
start_time=start,
end_time=end,
function=self.process_function,
out_queue=self.queue,
send_end=send_end,
logger=self.logger,
status_dict=self.status_dict,
)
return worker
def start(self):
try:
self.running = True
def consumer(queue):
if self.show_progress:
progress = tqdm.tqdm(total=self.estimated_frames)
try:
for item in iter(queue.get, None):
if self.result_function:
self.result_function(item)
if self.show_progress:
progress.update()
except BrokenPipeError:
for worker in self.workers:
worker.terminate()
for worker in self.workers:
worker.daemon = True
worker.start()
with multiprocessing.Manager() as manager:
d = manager.dict()
consumer_process = multiprocessing.Process(
target=consumer, args=(self.queue,)
)
consumer_process.daemon = True
consumer_process.start()
for worker in self.workers:
worker.join()
self.logger.debug(f"joined worker")
self.queue.put(None)
consumer_process.join()
return (
d.get("status", "") != "error",
[x.recv() for x in self.pipe_list],
)
except Exception as error:
raise Exception(error)
class VideoProcessorWorker(multiprocessing.Process):
""" A class for processing video """
def __init__(
self,
id: int,
video_path: str,
function: T.Callable[[av.VideoFrame], None],
start_time: float,
end_time: float,
out_queue: multiprocessing.Queue,
send_end: multiprocessing.Pipe,
logger=None,
status_dict=None,
):
"""
:param video_path: path to video
:param function: function to run on each frame
:param start_time: time at which to start the processing
:param end_time: time at which to start the processing
:param out_queue: queue where to put the results
"""
super().__init__()
self.id = id
self.video_path = video_path
self.function = function
self.start_time = start_time
self.end_time = end_time
self.out_queue = out_queue
self.send_end = send_end
self.logger = logger or logging.getLogger(f"worker-{self.id}")
self.status_dict = status_dict
def run(self):
try:
self.logger.debug(
f"processing video start={self.start_time:.5f} end={self.end_time:.5f}"
)
print(
f"processing video start={self.start_time:.5f} end={self.end_time:.5f}"
)
n_frames = 0
first_frame = last_frame = None
results_list = []
for frame in read_video_frames(
self.video_path, self.start_time, self.end_time
):
if first_frame is None:
first_frame = frame
n_frames += 1
result = self.function(frame)
self.out_queue.put(result)
if self.send_end:
results_list.append(result)
last_frame = frame
if self.send_end:
self.send_end.send(results_list)
self.logger.debug(
f"completed frames={n_frames}"
f" return {len(results_list)} apriltag detections"
)
print(f"completed frames={n_frames}")
except Exception:
self.status_dict["status"] = "error"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment