Last active
April 7, 2021 03:44
-
-
Save xmodar/f8d49dae1b7426879e586705f41e3483 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Ego4D video box blur.""" | |
import gc | |
import json | |
from pathlib import Path | |
from argparse import ArgumentParser | |
# conda install av pillow tqdm -c conda-forge -c anaconda | |
import av # used versions: av=8.0.3 and ffmpeg=4.3.1 | |
from tqdm import tqdm # used versions: tqdm=4.59.0 | |
from PIL import Image, ImageFilter, ImageDraw # used versions: pillow=8.1.2 | |
__all__ = ['video_box_blur'] | |
class VideoFrame: | |
"""Simple av.VideoFrame wrapper.""" | |
def __init__(self, frame): | |
self.frame = frame | |
def to_image(self): | |
"""Convert the frame to PIL image.""" | |
return self.frame.to_image() | |
def to_ndarray(self): | |
"""Convert the frame to NumPy array.""" | |
return self.frame.to_ndarray() | |
def from_image(self, image): | |
"""Replace the frame with a PIL image.""" | |
self.frame = av.VideoFrame.from_image(image) | |
self.frame.pict_type = 'NONE' | |
def from_ndarray(self, ndarray): | |
"""Replace the frame with a NumPy array.""" | |
self.frame = av.VideoFrame.from_ndarray(ndarray) | |
self.frame.pict_type = 'NONE' | |
class VideoEditor: | |
"""Clone a video while enabling frame editing (only video and audio).""" | |
# reference cycles in PyAV needs to be handled | |
_called_times = 0 | |
_gc_interval = 10 # run gc.collect() | |
def __init__(self, input_path, output_path, threading=True): | |
input_path = Path(input_path) | |
assert input_path.is_file(), 'incorrect video path' | |
output_path = Path(output_path) | |
if not output_path.suffix: # change directory to file in directory | |
output_path = output_path / input_path.name | |
if not output_path.parent.exists(): # create parent directory | |
output_path.parent.mkdir(parents=True, exist_ok=True) | |
assert output_path.suffix == input_path.suffix, 'extension mismatch' | |
self.input_path = input_path | |
self.output_path = output_path | |
self.threading = bool(threading) | |
def __enter__(self): | |
# pylint: disable=attribute-defined-outside-init | |
self.input_video = av.open(str(self.input_path)).__enter__() | |
self.output_video = av.open(str(self.output_path), 'w').__enter__() | |
streams = self.input_video.streams | |
streams = [streams.video[0], *streams.audio] | |
# enable threading in video stream | |
if self.threading: | |
streams[0].thread_type = 'AUTO' | |
else: | |
streams[0].thread_type = 'NONE' | |
out_streams = [self.mold(self.output_video, s) for s in streams] | |
def frame_iterator(): | |
for packet in self.input_video.demux(streams): | |
# skip flushing packets | |
if packet.dts is None: | |
continue | |
# get corresponding output stream | |
out_stream = out_streams[streams.index(packet.stream)] | |
# remux the audio as is | |
if packet.stream.type == 'audio': | |
packet.stream = out_stream | |
self.output_video.mux(packet) | |
else: | |
# decode the video packet into frames | |
for frame in packet.decode(): | |
frame = VideoFrame(frame) | |
yield frame | |
for out_packet in out_stream.encode(frame.frame): | |
self.output_video.mux(out_packet) | |
# num_frames = stream.duration * stream.time_base * stream.avg_rate | |
return frame_iterator() | |
def __exit__(self, exc_type, exc_value, traceback): | |
for stream in self.output_video.streams.video: | |
for packet in stream.encode(): # flush | |
self.output_video.mux(packet) | |
self.input_video.__exit__(exc_type, exc_value, traceback) | |
self.output_video.__exit__(exc_type, exc_value, traceback) | |
del self.input_video, self.output_video | |
# do garbage collection | |
self._called_times += 1 | |
if self._called_times % self._gc_interval == self._gc_interval - 1: | |
gc.collect() | |
@staticmethod | |
def mold(container, stream): | |
"""Add a stream to the container based on a template stream.""" | |
if stream.type == 'video': | |
# https://github.com/PyAV-Org/PyAV/issues/730 | |
fps = stream.average_rate | |
codec = stream.codec.name | |
out_stream = container.add_stream(codec, rate=fps) | |
out_stream.width = stream.width | |
out_stream.height = stream.height | |
out_stream.pix_fmt = stream.pix_fmt | |
out_stream.options = stream.options | |
out_stream.bit_rate = stream.bit_rate | |
out_stream.thread_type = stream.thread_type | |
else: | |
out_stream = container.add_stream(template=stream) | |
return out_stream | |
def fix_annotations( | |
annotations, # Ego4D annotations | |
exclude=(), # in ('faces', 'license_plates') | |
min_side=0, # in pixels | |
min_score=0, # scores are in [0, 1] | |
fix_scores=False, # resolve some None scores | |
): | |
"""Fix Ego4D annotations.""" | |
frames = [] | |
scores = {} | |
if exclude is None: | |
exclude = () | |
exclude = set([exclude] if isinstance(exclude, str) else exclude) | |
for frame in annotations['frames']: | |
out_frame = {'index': frame['index']} | |
for label, detections in frame.items(): | |
if label == 'index': | |
continue | |
out_frame[label] = [] | |
# exclude certain labels | |
if label in exclude: | |
detections = [] | |
for detection in detections: | |
score = detection['score'] | |
identity = detection['identity'] | |
bounding_box = detection['bounding_box'] | |
# ignore small boxes | |
if min_side is not None: | |
x_min, y_min, x_max, y_max = bounding_box | |
width = x_max - x_min + 1 | |
height = y_max - y_min + 1 | |
if min(width, height) < min_side: | |
continue | |
# keep the scores of the same identity across frames | |
key = (label, identity) | |
if key not in scores: | |
scores[key] = [] | |
scores[key].append(score) | |
out_frame[label].append(detection.copy()) | |
frames.append(out_frame) | |
if fix_scores or min_score > 0: | |
# compute average scores ignoring None's | |
for key, values in scores.items(): | |
values = [s for s in values if s is not None] | |
scores[key] = sum(values) / max(len(values), 1) | |
# go over the frames again to handle the scores | |
for frame in frames: | |
for label, detections in frame.items(): | |
if label == 'index': | |
continue | |
out_detections = [] | |
for detection in detections: | |
# get average score (not the original score) | |
score = scores[label, detection['identity']] | |
# ignore low confidence boxes | |
if score < min_score: | |
continue | |
# fix the score | |
if fix_scores: | |
detection['score'] = score | |
out_detections.append(detection) | |
frame[label] = out_detections | |
return {'frames': frames} | |
def video_box_blur(json_path, video_path, output_path, radius=20, **kwargs): | |
"""Apply box blur on an Ego4D video.""" | |
with open(json_path, 'r') as json_file: | |
annotations = json.load(json_file) | |
annotations = fix_annotations(annotations, **kwargs)['frames'] | |
box_blur_filter = ImageFilter.BoxBlur(radius) | |
with VideoEditor(video_path, output_path) as frames: | |
for frame, annotation in zip(frames, tqdm(annotations)): | |
for label, detections in annotation.items(): | |
if label == 'index' or not detections: | |
continue | |
image = frame.to_image() | |
for detection in detections: | |
box = detection['bounding_box'] | |
crop = image.crop(box) | |
blurred = crop.filter(box_blur_filter) | |
if label == 'faces': | |
# convert the box to oval | |
mask = Image.new('L', crop.size, 'white') | |
draw = ImageDraw.Draw(mask) | |
draw.ellipse([(0, 0), mask.size], fill='black') | |
blurred.paste(crop, mask=mask) | |
image.paste(blurred, box[:2]) | |
frame.from_image(image) | |
def main(): | |
"""Run video box blur on Ego4D videos.""" | |
parser = ArgumentParser(description='Ego4D Video Box Blur') | |
parser.add_argument('-b', '--blur', action='store_true') | |
parser.add_argument('-i', '--index', type=int, nargs='*', default=()) | |
parser.add_argument('-j', '--json-path', type=Path, default='./json') | |
parser.add_argument('-v', '--video-path', type=Path, default='./video') | |
parser.add_argument('-o', '--output-path', type=Path, default='./output') | |
parser.add_argument('-r', '--radius', type=int, default=20) | |
parser.add_argument('-e', '--exclude', nargs='*', default=()) | |
parser.add_argument('-m', '--min-side', type=int, default=0) | |
parser.add_argument('-s', '--min-score', type=float, default=0) | |
parser.add_argument('-f', '--fix-scores', action='store_true') | |
args = vars(parser.parse_args()) | |
blur = args.pop('blur') | |
indices = args.pop('index') | |
json_root = args.pop('json_path') | |
video_root = args.pop('video_path') | |
output_root = args.pop('output_path') | |
def glob(path, end): # case-insensitive glob | |
end = end.lower() | |
for file_path in Path(path).rglob('*'): | |
if file_path.name.lower().endswith(end): | |
yield file_path | |
# get all json files with corresponding video files | |
if any([json_root.is_file(), video_root.is_file()]): | |
if video_root.is_dir(): | |
video_root = video_root / json_root.name | |
elif json_root.is_dir(): | |
json_root = json_root / video_root.name | |
inputs = [(json_root, video_root)] | |
else: | |
inputs = [] | |
for video_path in glob(video_root, '.mp4'): | |
sub_path = video_path.parent.relative_to(video_root) | |
json_name = video_path.stem + '.json' | |
for json_path in glob(json_root / sub_path, json_name): | |
inputs.append((json_path, video_path)) | |
if not blur: | |
if not indices: | |
print('Use `--index I` argument to select videos if desired.') | |
print('Add `--blur` flag to apply video blurring on the following:') | |
inputs = sorted(inputs) | |
for i in indices if indices else range(len(inputs)): | |
json_path, video_path = inputs[i] # `i` is from `--index` argument | |
output_path = output_root / video_path.relative_to(video_root) | |
if output_path == output_root: | |
title = output_path.name | |
else: | |
title = str(output_path.relative_to(output_root)) | |
if blur: | |
print('#' * 10, title.center(50), '#' * 10) | |
print('json:', json_path.absolute()) | |
print('Video:', video_path.absolute()) | |
print('Output:', output_path.absolute()) | |
video_box_blur(json_path, video_path, output_path, **args) | |
else: | |
print(f'{i:<4d}:', title) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash --login | |
#SBATCH --job-name video_box_blur | |
#SBATCH --output slurm/%x.%3a.%A.out | |
#SBATCH --error slurm/%x.%3a.%A.err | |
#SBATCH --time 0-01:00:00 | |
#SBATCH --ntasks 32 | |
#SBATCH --mem 5G | |
# init conda and activate env (conda should already be in path) | |
if [ ! -z $CONDA_ENV ] | |
then | |
source $(conda info --base)/etc/profile.d/conda.sh | |
conda activate $CONDA_ENV | |
fi | |
# if we are under slurm ($SLURM_JOB_NAME is defined) | |
if [ ! -z $SLURM_ARRAY_TASK_ID ] | |
then | |
task="--index $SLURM_ARRAY_TASK_ID" | |
else | |
task="" | |
fi | |
python3 -m video_box_blur $task "$@" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""An interactive Ego4D annotation session.""" | |
import json | |
import threading | |
import functools | |
from io import BytesIO | |
from pathlib import Path | |
from argparse import ArgumentParser | |
from collections import OrderedDict | |
from IPython import display | |
from PIL import Image, ImageDraw | |
import torchvision | |
from torch.utils.data import Dataset | |
def to_gif_bytes(frames, **kwargs): | |
"""Convert a list of PIL images to a GIF image as bytes. | |
You can read it as a PIL image: | |
Image.open(io.BytesIO(gif_bytes)) | |
Or display it using IPython.display: | |
IPython.display(IPython.display.Image(gif_bytes)) | |
Or by converting it to a base64 URL: | |
gif_base64 = base64.b64encode(gif_bytes).decode('ascii') | |
html_tag = f'<img src="data:image/gif;base64,{gif_base64}">' | |
IPython.display(IPython.display.HTML(html_tag)) | |
""" | |
frames = tuple(frames) | |
buffer = BytesIO() | |
defaults = dict(format='GIF', save_all=True, loop=0) | |
defaults.update(kwargs) | |
frames[0].save(buffer, append_images=frames[1:], **defaults) | |
return buffer.getvalue() | |
def display_gif_bytes(gif_bytes): | |
"""Display a GIF image given as bytes.""" | |
return display.display(display.Image(gif_bytes)) | |
class Ego4dAnnotation(Dataset): | |
"""Annotation dataset for Ego4D.""" | |
def __init__(self, json_path, video_path, scale=1, cache_size=None): | |
self.scale = scale | |
self.json_path = Path(json_path) | |
self.video_path = Path(video_path) | |
# load annotations | |
data = self.load_detections(self.json_path) | |
self.detections, self.labels, self.num_frames = data | |
self.skip = set() # identities to skip when saving | |
# get number of frames per seconds in the video | |
self.fps = 1 # temporarily set fps = 1 | |
self.fps = self.read_video(0, 0)[2]['video_fps'] | |
# memoize self.get_gif | |
self.get_gif = functools.lru_cache(maxsize=cache_size)(self.get_gif) | |
def __getitem__(self, index): | |
if isinstance(index, tuple): | |
label, identity = index | |
else: | |
label, identity = tuple(self.detections)[index] | |
boxes = self.detections[label, identity] | |
return label, identity, boxes | |
def __len__(self): | |
return len(self.detections) | |
def read_video(self, start, end): | |
"""Read video frame interval assuming constant frame rate.""" | |
path = str(self.video_path) | |
start, end = start / self.fps, end / self.fps | |
video, audio, meta = torchvision.io.read_video(path, start, end, 'sec') | |
return video, audio, meta | |
def get_gif(self, label, identity): # pylint: disable=method-hidden | |
"""Get a GIF animation of the frames were this identity appears.""" | |
def scaled(values, as_type=None): | |
values = (x * self.scale for x in values) | |
if as_type is not None: | |
values = map(as_type, values) | |
return tuple(values) | |
def draw_box(inputs): | |
tensor, box = inputs | |
frame = Image.fromarray(tensor.numpy()) | |
if self.scale != 1: | |
frame = frame.resize(scaled(frame.size, int)) | |
draw = ImageDraw.Draw(frame) | |
draw.rectangle(scaled(box['bounding_box']), outline='red') | |
return frame | |
boxes = self.detections[label, identity] | |
frames = self.read_video(min(boxes) - 1, max(boxes))[0] | |
return to_gif_bytes(map(draw_box, zip(frames, boxes.values()))) | |
@staticmethod | |
def load_detections(json_path): | |
"""Load Ego4d JSON file.""" | |
with open(json_path) as json_file: | |
frames = json.load(json_file)['frames'] | |
frame = [] | |
num_frames = 0 | |
outputs = OrderedDict() | |
for frame in frames: | |
index = frame['index'] | |
num_frames = max(num_frames, index) | |
for label, detections in frame.items(): | |
if label == 'index': | |
continue | |
for detection in detections: | |
identity = detection['identity'] | |
key = (label, identity) | |
if key not in outputs: | |
outputs[key] = OrderedDict() | |
outputs[key][index] = { | |
'score': detection['score'], | |
'bounding_box': detection['bounding_box'], | |
} | |
labels = tuple(label for label in frame if label != 'index') | |
return outputs, labels, num_frames | |
@staticmethod | |
def save_detections(json_path, detections, labels, num_frames, skip=None): | |
"""Save Ego4d JSON file.""" | |
outputs = {} | |
skip = set() if skip is None else set(skip) | |
for (label, identity), boxes in detections.items(): | |
if (label, identity) in skip: | |
continue | |
for index, box in boxes.items(): | |
if index not in outputs: | |
outputs[index] = {} | |
if label not in outputs[index]: | |
outputs[index][label] = [] | |
box = { | |
'bounding_box': box['bounding_box'], | |
'identity': identity, | |
'score': box['score'], | |
} | |
outputs[index][label].append(box) | |
frames = [] | |
for index in range(1, num_frames + 1): | |
frame = {'index': index} | |
for label in labels: | |
if index in outputs and label in outputs[index]: | |
frame[label] = outputs[index][label] | |
else: | |
frame[label] = [] | |
frames.append(frame) | |
with open(json_path, 'w') as json_file: | |
json.dump({'frames': frames}, json_file) | |
def save(self, path=None, skip=None, labels=None): | |
"""Save Ego4d JSON file.""" | |
if path is None: | |
path = self.json_path | |
if skip is None: | |
skip = self.skip | |
if labels is None: | |
labels = self.labels | |
data = self.detections, labels, self.num_frames | |
return self.save_detections(path, *data, skip) | |
def get_title(self, label, identity, boxes=None): | |
"""Get a human readable representation for a data item.""" | |
skip = 'REMOVE' if (label, identity) in self.skip else ' KEEP ' | |
title = f'{skip} {label}[{identity}]' | |
boxes = [] if boxes is None else boxes.values() | |
scores = [b['score'] for b in boxes if b['score'] is not None] | |
score = 100 * sum(scores) / max(len(scores), 1) | |
if score != 0: | |
title += f' @ {score:.2f}%' | |
return title | |
def filter_annotations(self): | |
"""Interactively select detections to skip.""" | |
def get_title(index, label, identity, boxes): | |
return f'{index + 1:3d}: {self.get_title(label, identity, boxes)}' | |
def get_index(): | |
while True: | |
token = input(f'select a detection in [1, {len(self)}]:') | |
try: | |
index = int(token) - 1 | |
if 0 <= index < len(self): | |
break | |
raise ValueError('index not in range') | |
except ValueError as exception: | |
print(exception.args[0]) | |
index = -1 | |
return index | |
options = ['keep', 'remove', 'next', 'previous', 'choose', 'quit'] | |
flags = {o[0] for o in options} | |
assert len(options) == len(flags), 'the first letter is not unique' | |
query = ', '.join([f'({o[0]}){o[1:]}' for o in options]) + '?' | |
i = 0 | |
while i < len(self): | |
label, identity, boxes = self[i] | |
key = (label, identity) | |
print(get_title(i, label, identity, boxes)) | |
display_gif_bytes(self.get_gif(label, identity)) | |
token = input(query) | |
if token not in flags: | |
print(f'invalid input: `{token}` not in {flags}') | |
continue | |
if token == 'k': | |
if key in self.skip: | |
self.skip.remove(key) | |
token = 'n' | |
elif token == 'r': | |
self.skip.add(key) | |
token = 'n' | |
if token == 'n': | |
if i < len(self) - 1: | |
i += 1 | |
else: | |
print('reached the end') | |
token = 'c' | |
elif token == 'p': | |
if i > 0: | |
i -= 1 | |
else: | |
print('reached the beginning') | |
token = 'c' | |
if token in ('c', 'q'): | |
for j, item in enumerate(self): | |
print(get_title(j, *item)) | |
print(f'currently, you are on {i + 1}') | |
if token == 'c': | |
i = get_index() | |
else: | |
break | |
return self | |
def interactive(self, output_path=None): | |
"""Interactively select detections to skip and save to file.""" | |
def load_gif(): | |
for label, identity in self.detections: | |
if done: | |
break | |
self.get_gif(label, identity) | |
thread = threading.Thread(target=load_gif) | |
self.get_gif(*self[0][:2]) | |
done = False | |
thread.start() | |
self.filter_annotations() | |
done = True | |
thread.join() | |
flags = {'y', 'n'} | |
while output_path is not None: | |
token = input('save? [y/n]') | |
if token not in flags: | |
print(f'invalid input: `{token}` not in {flags}') | |
continue | |
if token == 'y': | |
self.save(output_path) | |
print('saved to file') | |
break | |
return self | |
def main(): | |
"""Run interactive annotation session.""" | |
parser = ArgumentParser(description='Ego4D Annotation') | |
parser.add_argument('-j', '--json-root', type=Path, default='./input') | |
parser.add_argument('-v', '--video-root', type=Path, default='./input') | |
parser.add_argument('-o', '--output-root', type=Path, default='./output') | |
parser.add_argument('-s', '--scale', type=float, default=0.25) | |
parser.add_argument('-c', '--cache-size', type=int, default=None) | |
args = vars(parser.parse_args()) | |
scale = args['scale'] | |
cache_size = args['cache_size'] | |
video_root = args['video_root'] | |
json_root = args['json_root'] | |
output_root = args['output_root'] | |
def glob(path, end): # case-insensitive glob | |
end = end.lower() | |
for file_path in Path(path).rglob('*'): | |
if file_path.name.lower().endswith(end): | |
yield file_path | |
for video_path in glob(video_root, '.mp4'): | |
sub_path = video_path.parent.relative_to(video_root) | |
for json_path in glob(json_root / sub_path, video_path.stem + '.json'): | |
output_path = output_root / json_path.relative_to(json_root) | |
# print(output_path) | |
print(video_path, json_path, output_path) | |
if output_path.exists(): | |
print('already processed') | |
break | |
dataset = Ego4dAnnotation(json_path, video_path, scale, cache_size) | |
output_path.parent.mkdir(parents=True, exist_ok=True) | |
for key in dataset.detections: # remove by default | |
dataset.skip.add(key) | |
dataset.interactive(output_path) | |
break | |
else: | |
print(f'did not find JSON file for {str(video_path)}') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
name=$(basename -s .sh $0) # get the name of the script | |
CONDA_ENV=${CONDA_ENV:-"ego4d_blur"} | |
# set slurm job arguments | |
slurm=( | |
--time=0-01:00:00 | |
# --array=0,1,2 # use this or --index below | |
) | |
# select the videos you want to process | |
experiments=( | |
# --index 0 1 2 # use this or --array above | |
) | |
# setup the root paths | |
paths=( | |
--video-path "/ibex/scratch/xum/V_cmp_all" | |
--json-path "/ibex/scratch/xum/V_fpr_Sample/sample" | |
--output-path "/ibex/scratch/xum/V_fpr_Sample/output" | |
) | |
# specify the annotation options | |
options=( | |
--radius 20 # box blur filter size in pixels | |
--exclude license_plates # labels to exlude {faces, license_plates} | |
--min-side 28 # ignore boxes with the smallest side equal to this | |
--min-score 0 # ignore boxes with scores less than this | |
) | |
job=( video_box_blur.sh ${experiments[@]} ${paths[@]} ${options[@]} "$@" ) | |
echo ${job[@]} | |
# run the script or submit if SUBMIT was defined | |
if [ ! $SUBMIT ] | |
then | |
source ${job[@]} | |
else | |
mkdir -p slurm $name | |
submit="sbatch --job-name ${CONDA_ENV}_${name} --export=ALL" | |
$submit ${slurm[@]} ${job[@]} --blur | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment