Skip to content

Instantly share code, notes, and snippets.

@xmodar
Last active April 7, 2021 03:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xmodar/f8d49dae1b7426879e586705f41e3483 to your computer and use it in GitHub Desktop.
Save xmodar/f8d49dae1b7426879e586705f41e3483 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Ego4D video box blur."""
import gc
import json
from pathlib import Path
from argparse import ArgumentParser
# conda install av pillow tqdm -c conda-forge -c anaconda
import av # used versions: av=8.0.3 and ffmpeg=4.3.1
from tqdm import tqdm # used versions: tqdm=4.59.0
from PIL import Image, ImageFilter, ImageDraw # used versions: pillow=8.1.2
__all__ = ['video_box_blur']
class VideoFrame:
"""Simple av.VideoFrame wrapper."""
def __init__(self, frame):
self.frame = frame
def to_image(self):
"""Convert the frame to PIL image."""
return self.frame.to_image()
def to_ndarray(self):
"""Convert the frame to NumPy array."""
return self.frame.to_ndarray()
def from_image(self, image):
"""Replace the frame with a PIL image."""
self.frame = av.VideoFrame.from_image(image)
self.frame.pict_type = 'NONE'
def from_ndarray(self, ndarray):
"""Replace the frame with a NumPy array."""
self.frame = av.VideoFrame.from_ndarray(ndarray)
self.frame.pict_type = 'NONE'
class VideoEditor:
"""Clone a video while enabling frame editing (only video and audio)."""
# reference cycles in PyAV needs to be handled
_called_times = 0
_gc_interval = 10 # run gc.collect()
def __init__(self, input_path, output_path, threading=True):
input_path = Path(input_path)
assert input_path.is_file(), 'incorrect video path'
output_path = Path(output_path)
if not output_path.suffix: # change directory to file in directory
output_path = output_path / input_path.name
if not output_path.parent.exists(): # create parent directory
output_path.parent.mkdir(parents=True, exist_ok=True)
assert output_path.suffix == input_path.suffix, 'extension mismatch'
self.input_path = input_path
self.output_path = output_path
self.threading = bool(threading)
def __enter__(self):
# pylint: disable=attribute-defined-outside-init
self.input_video = av.open(str(self.input_path)).__enter__()
self.output_video = av.open(str(self.output_path), 'w').__enter__()
streams = self.input_video.streams
streams = [streams.video[0], *streams.audio]
# enable threading in video stream
if self.threading:
streams[0].thread_type = 'AUTO'
else:
streams[0].thread_type = 'NONE'
out_streams = [self.mold(self.output_video, s) for s in streams]
def frame_iterator():
for packet in self.input_video.demux(streams):
# skip flushing packets
if packet.dts is None:
continue
# get corresponding output stream
out_stream = out_streams[streams.index(packet.stream)]
# remux the audio as is
if packet.stream.type == 'audio':
packet.stream = out_stream
self.output_video.mux(packet)
else:
# decode the video packet into frames
for frame in packet.decode():
frame = VideoFrame(frame)
yield frame
for out_packet in out_stream.encode(frame.frame):
self.output_video.mux(out_packet)
# num_frames = stream.duration * stream.time_base * stream.avg_rate
return frame_iterator()
def __exit__(self, exc_type, exc_value, traceback):
for stream in self.output_video.streams.video:
for packet in stream.encode(): # flush
self.output_video.mux(packet)
self.input_video.__exit__(exc_type, exc_value, traceback)
self.output_video.__exit__(exc_type, exc_value, traceback)
del self.input_video, self.output_video
# do garbage collection
self._called_times += 1
if self._called_times % self._gc_interval == self._gc_interval - 1:
gc.collect()
@staticmethod
def mold(container, stream):
"""Add a stream to the container based on a template stream."""
if stream.type == 'video':
# https://github.com/PyAV-Org/PyAV/issues/730
fps = stream.average_rate
codec = stream.codec.name
out_stream = container.add_stream(codec, rate=fps)
out_stream.width = stream.width
out_stream.height = stream.height
out_stream.pix_fmt = stream.pix_fmt
out_stream.options = stream.options
out_stream.bit_rate = stream.bit_rate
out_stream.thread_type = stream.thread_type
else:
out_stream = container.add_stream(template=stream)
return out_stream
def fix_annotations(
annotations, # Ego4D annotations
exclude=(), # in ('faces', 'license_plates')
min_side=0, # in pixels
min_score=0, # scores are in [0, 1]
fix_scores=False, # resolve some None scores
):
"""Fix Ego4D annotations."""
frames = []
scores = {}
if exclude is None:
exclude = ()
exclude = set([exclude] if isinstance(exclude, str) else exclude)
for frame in annotations['frames']:
out_frame = {'index': frame['index']}
for label, detections in frame.items():
if label == 'index':
continue
out_frame[label] = []
# exclude certain labels
if label in exclude:
detections = []
for detection in detections:
score = detection['score']
identity = detection['identity']
bounding_box = detection['bounding_box']
# ignore small boxes
if min_side is not None:
x_min, y_min, x_max, y_max = bounding_box
width = x_max - x_min + 1
height = y_max - y_min + 1
if min(width, height) < min_side:
continue
# keep the scores of the same identity across frames
key = (label, identity)
if key not in scores:
scores[key] = []
scores[key].append(score)
out_frame[label].append(detection.copy())
frames.append(out_frame)
if fix_scores or min_score > 0:
# compute average scores ignoring None's
for key, values in scores.items():
values = [s for s in values if s is not None]
scores[key] = sum(values) / max(len(values), 1)
# go over the frames again to handle the scores
for frame in frames:
for label, detections in frame.items():
if label == 'index':
continue
out_detections = []
for detection in detections:
# get average score (not the original score)
score = scores[label, detection['identity']]
# ignore low confidence boxes
if score < min_score:
continue
# fix the score
if fix_scores:
detection['score'] = score
out_detections.append(detection)
frame[label] = out_detections
return {'frames': frames}
def video_box_blur(json_path, video_path, output_path, radius=20, **kwargs):
"""Apply box blur on an Ego4D video."""
with open(json_path, 'r') as json_file:
annotations = json.load(json_file)
annotations = fix_annotations(annotations, **kwargs)['frames']
box_blur_filter = ImageFilter.BoxBlur(radius)
with VideoEditor(video_path, output_path) as frames:
for frame, annotation in zip(frames, tqdm(annotations)):
for label, detections in annotation.items():
if label == 'index' or not detections:
continue
image = frame.to_image()
for detection in detections:
box = detection['bounding_box']
crop = image.crop(box)
blurred = crop.filter(box_blur_filter)
if label == 'faces':
# convert the box to oval
mask = Image.new('L', crop.size, 'white')
draw = ImageDraw.Draw(mask)
draw.ellipse([(0, 0), mask.size], fill='black')
blurred.paste(crop, mask=mask)
image.paste(blurred, box[:2])
frame.from_image(image)
def main():
"""Run video box blur on Ego4D videos."""
parser = ArgumentParser(description='Ego4D Video Box Blur')
parser.add_argument('-b', '--blur', action='store_true')
parser.add_argument('-i', '--index', type=int, nargs='*', default=())
parser.add_argument('-j', '--json-path', type=Path, default='./json')
parser.add_argument('-v', '--video-path', type=Path, default='./video')
parser.add_argument('-o', '--output-path', type=Path, default='./output')
parser.add_argument('-r', '--radius', type=int, default=20)
parser.add_argument('-e', '--exclude', nargs='*', default=())
parser.add_argument('-m', '--min-side', type=int, default=0)
parser.add_argument('-s', '--min-score', type=float, default=0)
parser.add_argument('-f', '--fix-scores', action='store_true')
args = vars(parser.parse_args())
blur = args.pop('blur')
indices = args.pop('index')
json_root = args.pop('json_path')
video_root = args.pop('video_path')
output_root = args.pop('output_path')
def glob(path, end): # case-insensitive glob
end = end.lower()
for file_path in Path(path).rglob('*'):
if file_path.name.lower().endswith(end):
yield file_path
# get all json files with corresponding video files
if any([json_root.is_file(), video_root.is_file()]):
if video_root.is_dir():
video_root = video_root / json_root.name
elif json_root.is_dir():
json_root = json_root / video_root.name
inputs = [(json_root, video_root)]
else:
inputs = []
for video_path in glob(video_root, '.mp4'):
sub_path = video_path.parent.relative_to(video_root)
json_name = video_path.stem + '.json'
for json_path in glob(json_root / sub_path, json_name):
inputs.append((json_path, video_path))
if not blur:
if not indices:
print('Use `--index I` argument to select videos if desired.')
print('Add `--blur` flag to apply video blurring on the following:')
inputs = sorted(inputs)
for i in indices if indices else range(len(inputs)):
json_path, video_path = inputs[i] # `i` is from `--index` argument
output_path = output_root / video_path.relative_to(video_root)
if output_path == output_root:
title = output_path.name
else:
title = str(output_path.relative_to(output_root))
if blur:
print('#' * 10, title.center(50), '#' * 10)
print('json:', json_path.absolute())
print('Video:', video_path.absolute())
print('Output:', output_path.absolute())
video_box_blur(json_path, video_path, output_path, **args)
else:
print(f'{i:<4d}:', title)
if __name__ == '__main__':
main()
#!/bin/bash --login
#SBATCH --job-name video_box_blur
#SBATCH --output slurm/%x.%3a.%A.out
#SBATCH --error slurm/%x.%3a.%A.err
#SBATCH --time 0-01:00:00
#SBATCH --ntasks 32
#SBATCH --mem 5G
# init conda and activate env (conda should already be in path)
if [ ! -z $CONDA_ENV ]
then
source $(conda info --base)/etc/profile.d/conda.sh
conda activate $CONDA_ENV
fi
# if we are under slurm ($SLURM_JOB_NAME is defined)
if [ ! -z $SLURM_ARRAY_TASK_ID ]
then
task="--index $SLURM_ARRAY_TASK_ID"
else
task=""
fi
python3 -m video_box_blur $task "$@"
#!/usr/bin/env python3
"""An interactive Ego4D annotation session."""
import json
import threading
import functools
from io import BytesIO
from pathlib import Path
from argparse import ArgumentParser
from collections import OrderedDict
from IPython import display
from PIL import Image, ImageDraw
import torchvision
from torch.utils.data import Dataset
def to_gif_bytes(frames, **kwargs):
"""Convert a list of PIL images to a GIF image as bytes.
You can read it as a PIL image:
Image.open(io.BytesIO(gif_bytes))
Or display it using IPython.display:
IPython.display(IPython.display.Image(gif_bytes))
Or by converting it to a base64 URL:
gif_base64 = base64.b64encode(gif_bytes).decode('ascii')
html_tag = f'<img src="data:image/gif;base64,{gif_base64}">'
IPython.display(IPython.display.HTML(html_tag))
"""
frames = tuple(frames)
buffer = BytesIO()
defaults = dict(format='GIF', save_all=True, loop=0)
defaults.update(kwargs)
frames[0].save(buffer, append_images=frames[1:], **defaults)
return buffer.getvalue()
def display_gif_bytes(gif_bytes):
"""Display a GIF image given as bytes."""
return display.display(display.Image(gif_bytes))
class Ego4dAnnotation(Dataset):
"""Annotation dataset for Ego4D."""
def __init__(self, json_path, video_path, scale=1, cache_size=None):
self.scale = scale
self.json_path = Path(json_path)
self.video_path = Path(video_path)
# load annotations
data = self.load_detections(self.json_path)
self.detections, self.labels, self.num_frames = data
self.skip = set() # identities to skip when saving
# get number of frames per seconds in the video
self.fps = 1 # temporarily set fps = 1
self.fps = self.read_video(0, 0)[2]['video_fps']
# memoize self.get_gif
self.get_gif = functools.lru_cache(maxsize=cache_size)(self.get_gif)
def __getitem__(self, index):
if isinstance(index, tuple):
label, identity = index
else:
label, identity = tuple(self.detections)[index]
boxes = self.detections[label, identity]
return label, identity, boxes
def __len__(self):
return len(self.detections)
def read_video(self, start, end):
"""Read video frame interval assuming constant frame rate."""
path = str(self.video_path)
start, end = start / self.fps, end / self.fps
video, audio, meta = torchvision.io.read_video(path, start, end, 'sec')
return video, audio, meta
def get_gif(self, label, identity): # pylint: disable=method-hidden
"""Get a GIF animation of the frames were this identity appears."""
def scaled(values, as_type=None):
values = (x * self.scale for x in values)
if as_type is not None:
values = map(as_type, values)
return tuple(values)
def draw_box(inputs):
tensor, box = inputs
frame = Image.fromarray(tensor.numpy())
if self.scale != 1:
frame = frame.resize(scaled(frame.size, int))
draw = ImageDraw.Draw(frame)
draw.rectangle(scaled(box['bounding_box']), outline='red')
return frame
boxes = self.detections[label, identity]
frames = self.read_video(min(boxes) - 1, max(boxes))[0]
return to_gif_bytes(map(draw_box, zip(frames, boxes.values())))
@staticmethod
def load_detections(json_path):
"""Load Ego4d JSON file."""
with open(json_path) as json_file:
frames = json.load(json_file)['frames']
frame = []
num_frames = 0
outputs = OrderedDict()
for frame in frames:
index = frame['index']
num_frames = max(num_frames, index)
for label, detections in frame.items():
if label == 'index':
continue
for detection in detections:
identity = detection['identity']
key = (label, identity)
if key not in outputs:
outputs[key] = OrderedDict()
outputs[key][index] = {
'score': detection['score'],
'bounding_box': detection['bounding_box'],
}
labels = tuple(label for label in frame if label != 'index')
return outputs, labels, num_frames
@staticmethod
def save_detections(json_path, detections, labels, num_frames, skip=None):
"""Save Ego4d JSON file."""
outputs = {}
skip = set() if skip is None else set(skip)
for (label, identity), boxes in detections.items():
if (label, identity) in skip:
continue
for index, box in boxes.items():
if index not in outputs:
outputs[index] = {}
if label not in outputs[index]:
outputs[index][label] = []
box = {
'bounding_box': box['bounding_box'],
'identity': identity,
'score': box['score'],
}
outputs[index][label].append(box)
frames = []
for index in range(1, num_frames + 1):
frame = {'index': index}
for label in labels:
if index in outputs and label in outputs[index]:
frame[label] = outputs[index][label]
else:
frame[label] = []
frames.append(frame)
with open(json_path, 'w') as json_file:
json.dump({'frames': frames}, json_file)
def save(self, path=None, skip=None, labels=None):
"""Save Ego4d JSON file."""
if path is None:
path = self.json_path
if skip is None:
skip = self.skip
if labels is None:
labels = self.labels
data = self.detections, labels, self.num_frames
return self.save_detections(path, *data, skip)
def get_title(self, label, identity, boxes=None):
"""Get a human readable representation for a data item."""
skip = 'REMOVE' if (label, identity) in self.skip else ' KEEP '
title = f'{skip} {label}[{identity}]'
boxes = [] if boxes is None else boxes.values()
scores = [b['score'] for b in boxes if b['score'] is not None]
score = 100 * sum(scores) / max(len(scores), 1)
if score != 0:
title += f' @ {score:.2f}%'
return title
def filter_annotations(self):
"""Interactively select detections to skip."""
def get_title(index, label, identity, boxes):
return f'{index + 1:3d}: {self.get_title(label, identity, boxes)}'
def get_index():
while True:
token = input(f'select a detection in [1, {len(self)}]:')
try:
index = int(token) - 1
if 0 <= index < len(self):
break
raise ValueError('index not in range')
except ValueError as exception:
print(exception.args[0])
index = -1
return index
options = ['keep', 'remove', 'next', 'previous', 'choose', 'quit']
flags = {o[0] for o in options}
assert len(options) == len(flags), 'the first letter is not unique'
query = ', '.join([f'({o[0]}){o[1:]}' for o in options]) + '?'
i = 0
while i < len(self):
label, identity, boxes = self[i]
key = (label, identity)
print(get_title(i, label, identity, boxes))
display_gif_bytes(self.get_gif(label, identity))
token = input(query)
if token not in flags:
print(f'invalid input: `{token}` not in {flags}')
continue
if token == 'k':
if key in self.skip:
self.skip.remove(key)
token = 'n'
elif token == 'r':
self.skip.add(key)
token = 'n'
if token == 'n':
if i < len(self) - 1:
i += 1
else:
print('reached the end')
token = 'c'
elif token == 'p':
if i > 0:
i -= 1
else:
print('reached the beginning')
token = 'c'
if token in ('c', 'q'):
for j, item in enumerate(self):
print(get_title(j, *item))
print(f'currently, you are on {i + 1}')
if token == 'c':
i = get_index()
else:
break
return self
def interactive(self, output_path=None):
"""Interactively select detections to skip and save to file."""
def load_gif():
for label, identity in self.detections:
if done:
break
self.get_gif(label, identity)
thread = threading.Thread(target=load_gif)
self.get_gif(*self[0][:2])
done = False
thread.start()
self.filter_annotations()
done = True
thread.join()
flags = {'y', 'n'}
while output_path is not None:
token = input('save? [y/n]')
if token not in flags:
print(f'invalid input: `{token}` not in {flags}')
continue
if token == 'y':
self.save(output_path)
print('saved to file')
break
return self
def main():
"""Run interactive annotation session."""
parser = ArgumentParser(description='Ego4D Annotation')
parser.add_argument('-j', '--json-root', type=Path, default='./input')
parser.add_argument('-v', '--video-root', type=Path, default='./input')
parser.add_argument('-o', '--output-root', type=Path, default='./output')
parser.add_argument('-s', '--scale', type=float, default=0.25)
parser.add_argument('-c', '--cache-size', type=int, default=None)
args = vars(parser.parse_args())
scale = args['scale']
cache_size = args['cache_size']
video_root = args['video_root']
json_root = args['json_root']
output_root = args['output_root']
def glob(path, end): # case-insensitive glob
end = end.lower()
for file_path in Path(path).rglob('*'):
if file_path.name.lower().endswith(end):
yield file_path
for video_path in glob(video_root, '.mp4'):
sub_path = video_path.parent.relative_to(video_root)
for json_path in glob(json_root / sub_path, video_path.stem + '.json'):
output_path = output_root / json_path.relative_to(json_root)
# print(output_path)
print(video_path, json_path, output_path)
if output_path.exists():
print('already processed')
break
dataset = Ego4dAnnotation(json_path, video_path, scale, cache_size)
output_path.parent.mkdir(parents=True, exist_ok=True)
for key in dataset.detections: # remove by default
dataset.skip.add(key)
dataset.interactive(output_path)
break
else:
print(f'did not find JSON file for {str(video_path)}')
if __name__ == '__main__':
main()
#!/bin/bash
name=$(basename -s .sh $0) # get the name of the script
CONDA_ENV=${CONDA_ENV:-"ego4d_blur"}
# set slurm job arguments
slurm=(
--time=0-01:00:00
# --array=0,1,2 # use this or --index below
)
# select the videos you want to process
experiments=(
# --index 0 1 2 # use this or --array above
)
# setup the root paths
paths=(
--video-path "/ibex/scratch/xum/V_cmp_all"
--json-path "/ibex/scratch/xum/V_fpr_Sample/sample"
--output-path "/ibex/scratch/xum/V_fpr_Sample/output"
)
# specify the annotation options
options=(
--radius 20 # box blur filter size in pixels
--exclude license_plates # labels to exlude {faces, license_plates}
--min-side 28 # ignore boxes with the smallest side equal to this
--min-score 0 # ignore boxes with scores less than this
)
job=( video_box_blur.sh ${experiments[@]} ${paths[@]} ${options[@]} "$@" )
echo ${job[@]}
# run the script or submit if SUBMIT was defined
if [ ! $SUBMIT ]
then
source ${job[@]}
else
mkdir -p slurm $name
submit="sbatch --job-name ${CONDA_ENV}_${name} --export=ALL"
$submit ${slurm[@]} ${job[@]} --blur
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment