Script for using Matroid to obfuscate desired objects within an input video.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Script for using Matroid to obfuscate desired objects within an input video. | |
Example usage: | |
To obfuscate all "male" and "female" faces with a score above 0.5 in | |
"my_original_video.mp4" using the Matroid gender detector (5a4d4eb907a4ed0013eaa6f6). | |
Obfuscated video "my_obfuscated_video.mp4" is written and displayed in real-time. | |
python obfuscation.py \ | |
--input my_original_video.mp4 \ | |
--output my_obfuscated_video.mp4 \ | |
--access_token ae30cdbe4f80b56c652af2e5d8e69023 \ | |
--detector_id 5a4d4eb907a4ed0013eaa6f6 \ | |
--label male \ | |
--threshold 0.5 \ | |
--label female \ | |
--threshold 0.5 | |
Note: Hit ESC to stop script (with obfsucated video as active window). | |
''' | |
import cv2 | |
import requests | |
import logging | |
import threading | |
import Queue | |
import time | |
import random | |
import argparse | |
import os | |
from collections import defaultdict, deque | |
import numpy as np | |
from scipy.spatial.distance import pdist, squareform | |
import string | |
class CONSTANTS: | |
MATROID_BASE_URL = 'https://www.matroid.com' | |
THREAD_ID_LEN = 8 | |
SAMPLE_QUEUE_SIZE = 256 | |
API_TIMEOUT = 5 | |
QUEUE_TIMEOUT = 1 | |
SAMPLER_SLEEP = 1 | |
CLASSIFIER_SLEEP = 1 | |
OBFUSCATER_SLEEP = 1 | |
N_CLASSIFIERS = 16 | |
CLASSIFIER_BATCH_SIZE = 4 | |
N_SUCCESSES_FOR_BATCH_INCREASE = 20 | |
class Sampler(threading.Thread): | |
''' Samples frames from input video. ''' | |
def __init__(self, cap, sample_queue): | |
threading.Thread.__init__(self) | |
unique_id = ''.join(random.choice(string.hexdigits) for _ in range(CONSTANTS.THREAD_ID_LEN)) | |
self.setName('Sampler-{}'.format(unique_id)) | |
self.cap = cap | |
self.sample_queue = sample_queue | |
self.stop_event = threading.Event() | |
def run(self): | |
logging.info('{} started.'.format(self.getName())) | |
i = 0 | |
while not self.stop_event.isSet(): | |
if self.sample_queue.full(): | |
time.sleep(CONSTANTS.SAMPLER_SLEEP) | |
continue | |
ret, frame = self.cap.read() | |
if not ret: | |
logging.info('{} done.'.format(self.getName())) | |
break | |
self.sample_queue.put((i, frame)) | |
i += 1 | |
def stop(self): | |
self.stop_event.set() | |
class Classifier(threading.Thread): | |
''' Classifies sampled frames via Matroid API. ''' | |
def __init__(self, sample_queue, classified_queue, lock, access_token, detector_id, batch_size): | |
threading.Thread.__init__(self) | |
unique_id = ''.join(random.choice(string.hexdigits) for _ in range(CONSTANTS.THREAD_ID_LEN)) | |
self.setName('Classifier-{}'.format(unique_id)) | |
self.sample_queue = sample_queue | |
self.temp_queue = Queue.Queue() | |
self.classified_queue = classified_queue | |
self.lock = lock | |
self.access_token = access_token | |
self.detector_id = detector_id | |
self.batch_size = batch_size | |
self.stop_event = threading.Event() | |
def classify(self, images): | |
''' Classifies a list of jpeg @images using the Matroid API. ''' | |
try: | |
files = [('file', (str(i),img)) for i, img in enumerate(images)] | |
ret = requests.post( | |
'{}/api/v1/detectors/{}/classify_image'.format(CONSTANTS.MATROID_BASE_URL, self.detector_id), | |
headers={ 'Authorization': 'Bearer {}'.format(self.access_token) }, | |
files= files, | |
timeout=CONSTANTS.API_TIMEOUT) | |
results = ret.json()['results'] | |
return results | |
except requests.exceptions.Timeout: | |
logging.error('{}: API classification timeout.'.format(self.getName())) | |
return None | |
except Exception as e: | |
logging.error('{}: API classification error.'.format(self.getName()), exc_info=True) | |
return None | |
def run(self): | |
logging.info('{} started.'.format(self.getName())) | |
n_successes = 0 | |
while not self.stop_event.isSet(): | |
if self.sample_queue.empty() and self.temp_queue.empty(): | |
if not self.is_sampler_alive(): | |
logging.info('{} done.'.format(self.getName())) | |
break | |
else: | |
time.sleep(random.random()*CONSTANTS.CLASSIFIER_SLEEP) | |
continue | |
batch_size = min(self.sample_queue.qsize() + self.temp_queue.qsize(), self.batch_size) | |
frame_i, frames, jpgs = [], [], [] | |
with self.lock: | |
for _ in xrange(batch_size): | |
try: | |
if not self.temp_queue.empty(): | |
i, frame = self.temp_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT) | |
else: | |
i, frame = self.sample_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT) | |
except Queue.Empty: | |
logging.info('{}: Timeout getting next frame for classifier.'.format(self.getName())) | |
continue | |
frame_i.append(i) | |
frames.append(frame) | |
_, jpg = cv2.imencode('.jpg', frame) | |
jpgs.append(jpg) | |
if len(jpgs) == 0: continue | |
results = self.classify(jpgs) | |
if results is None: | |
for i,frame in zip(frame_i, frames): | |
self.temp_queue.put((i, frame)) | |
if self.batch_size != 1: | |
self.batch_size = max(self.batch_size / 2, 1) | |
logging.info('{}: batch size decreased to {}.'.format(self.getName(), self.batch_size)) | |
n_successes = 0 | |
time.sleep(random.random()*CONSTANTS.CLASSIFIER_SLEEP) | |
else: | |
for i, f, r in zip(frame_i, frames, results): | |
self.classified_queue.put((i, (f, r))) | |
n_successes += 1 | |
if n_successes >= CONSTANTS.N_SUCCESSES_FOR_BATCH_INCREASE and self.batch_size < CONSTANTS.CLASSIFIER_BATCH_SIZE: | |
self.batch_size = min(CONSTANTS.CLASSIFIER_BATCH_SIZE, self.batch_size*2) | |
logging.info('{}: batch size increased to {}.'.format(self.getName(), self.batch_size)) | |
n_successes = 0 | |
time.sleep(random.random()*CONSTANTS.CLASSIFIER_SLEEP) | |
def is_sampler_alive(self): | |
sampler = filter(lambda x: x.getName().startswith('Sampler-'), threading.enumerate()) | |
return len(sampler) != 0 | |
def stop(self): | |
self.stop_event.set() | |
class Obfuscater(threading.Thread): | |
''' Obfuscates objects in frames (based on classification results) and writes video. ''' | |
def __init__(self, classified_queue, obfuscated_queue, labels_to_obscure, box_color, window_size, similarity_threshold, output_video, n_frames): | |
threading.Thread.__init__(self) | |
unique_id = ''.join(random.choice(string.hexdigits) for _ in range(CONSTANTS.THREAD_ID_LEN)) | |
self.setName('Obfuscater-{}'.format(unique_id)) | |
self.classified_queue = classified_queue | |
self.obfuscated_queue = obfuscated_queue | |
self.labels_to_obscure = labels_to_obscure | |
self.box_color = box_color | |
self.window_size = window_size | |
self.similarity_threshold = similarity_threshold | |
self.output_video = output_video | |
self.n_frames = n_frames | |
self.stop_event = threading.Event() | |
def run(self): | |
logging.info('{} started.'.format(self.getName())) | |
next_i, t_empty = 0, 0 | |
window = deque([None]*(self.window_size / 2)) | |
i_ctr = self.window_size / 2 | |
has_padded_for_last_frames = False | |
while not self.stop_event.isSet(): | |
if self.classified_queue.empty(): | |
if not self.is_any_classifier_alive(): | |
if has_padded_for_last_frames: | |
logging.info('Processing complete.') | |
logging.info('{} done.'.format(self.getName())) | |
break | |
else: | |
# Add padding to process frames in second half of window | |
for i in xrange(next_i, next_i + self.window_size / 2): | |
self.classified_queue.put((i, (None, None))) | |
has_padded_for_last_frames = True | |
else: | |
time.sleep(CONSTANTS.OBFUSCATER_SLEEP) | |
t_empty += 1 | |
continue | |
i, (frame, results) = self.classified_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT) | |
if i != next_i: | |
self.classified_queue.put((i, (frame, results))) | |
time.sleep(CONSTANTS.OBFUSCATER_SLEEP) | |
continue | |
next_i += 1 | |
t_empty = 0 | |
window.append((frame, results)) | |
if len(window) < self.window_size: | |
time.sleep(CONSTANTS.OBFUSCATER_SLEEP) | |
continue | |
elif len(window) > self.window_size: | |
window.popleft() | |
window_results = map(lambda x: x[1] if x is not None else None, window) | |
smoothed_results = self.smooth(window_results, self.similarity_threshold) | |
frame_ctr = window[i_ctr][0] | |
self.obfuscate(frame_ctr, smoothed_results, self.labels_to_obscure, self.box_color) | |
self.obfuscated_queue.put(frame_ctr) | |
self.output_video.write(frame_ctr) | |
if next_i % 10 == 0: | |
percent_processed = next_i * 100.0 / self.n_frames | |
logging.info('Processed {} frames ({:.2f}%)'.format(next_i, percent_processed)) | |
def smooth(self, results, similarity_threshold=0.9): | |
''' Returns smoothed classification results for middle element within list @results. ''' | |
assert len(results) % 2 == 1, 'results must have an odd number of elements. Has {}.'.format(len(results)) | |
non_padded_results = filter(lambda x: x is not None, results) | |
n_results = len(non_padded_results) | |
# Group predictions by label | |
predictions = [pred for r in non_padded_results for pred in r.get('predictions', [])] | |
preds_by_label = defaultdict(list) | |
for pred in predictions: | |
label, score = max(pred['labels'].items(), key=lambda x: x[1]) | |
if label not in self.labels_to_obscure: | |
continue | |
preds_by_label[label].append(pred) | |
# Group predictions by object using similarity metric | |
preds_by_objects = [] | |
for label, preds in preds_by_label.iteritems(): | |
X = np.array([[p['bbox']['top'], p['bbox']['left'], | |
p['bbox']['height'], p['bbox']['width']] for p in preds]) | |
normalized_dist = squareform(pdist(X, 'euclidean')) / 2.0 | |
similarity = 1 - normalized_dist | |
n_rows = len(preds) | |
groups, pairings = {}, {} | |
for i in xrange(n_rows): | |
if i not in pairings: | |
groups[i] = [preds[i]] | |
for j in xrange(i+1, n_rows): | |
if similarity[i][j] > similarity_threshold: | |
p = pairings.get(i, i) | |
groups[p].append(preds[j]) | |
pairings[j] = p | |
preds_by_objects.extend(groups.values()) | |
# Average score and bbox for each object | |
smoothed_preds = [] | |
for preds in preds_by_objects: | |
n_preds = len(preds) | |
smoothed_scores = {l: sum(p['labels'][l] for p in preds) / n_results | |
for l in self.labels_to_obscure} | |
smoothed_bbox = {b: sum(p['bbox'][b] for p in preds) / n_preds | |
for b in ['left', 'top', 'width', 'height']} | |
smoothed_preds.append({'labels': smoothed_scores, 'bbox': smoothed_bbox}) | |
smoothed_results = {'predictions': smoothed_preds} | |
return smoothed_results | |
def obfuscate(self, frame, results, labels, color=(0,0,0)): | |
for pred in results.get('predictions', []): | |
label, score = max(pred['labels'].items(), key=lambda x: x[1]) | |
if score < labels.get(label, 1.0): continue | |
# Find absolute box coordinates | |
height, width, _ = frame.shape | |
bbox = pred['bbox'] | |
ymin = int(bbox['top'] * height) | |
xmin = int(bbox['left'] * width) | |
ymax = int((bbox['top'] + bbox['height']) * height) | |
xmax = int((bbox['left'] + bbox['width']) * width) | |
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), color, thickness=cv2.FILLED) | |
def is_any_classifier_alive(self): | |
classifiers = filter(lambda x: x.getName().startswith('Classifier-'), threading.enumerate()) | |
return len(classifiers) != 0 | |
def stop(self): | |
self.stop_event.set() | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-i', '--input', help='input video to obscure', type=str, required=True) | |
parser.add_argument('-o', '--output', help='output video', type=str, required=True) | |
parser.add_argument('-c', '--fourcc_codec', help='output video codec. Refer to fourcc.org/codecs.php.', type=str, default='mp4v') | |
parser.add_argument('-a', '--access_token', help='Matroid access token', type=str, required=True) | |
parser.add_argument('-d', '--detector_id', help='Matroid detector id', type=str, required=True) | |
parser.add_argument('-l', '--label', help='label for obfuscation', action='append', type=str) | |
parser.add_argument('-t', '--threshold', help='probability threshold for obfuscation', action='append', type=float) | |
parser.add_argument('-b', '--box_color', help='color of obfuscation boxes', nargs='+', type=int, default=[0, 0, 0]) | |
parser.add_argument('-w', '--window_size', help='number of frames in window for smoothing. Must be odd. \ | |
Recommend lower values for videos with fast moving objects (e.g. 5) and higher values \ | |
for videos with slower moving objects (e.g. 11).', type=int, default=11) | |
parser.add_argument('-s', '--similarity_threshold', help='similarity threshold (0 to 1) for identifying \ | |
bboxes that belong to the same object.', type=float, default=0.95) | |
args = parser.parse_args() | |
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%H:%M:%S', level=logging.INFO) | |
# Validate input / output videos | |
if not os.path.isfile(args.input): | |
raise IOError('Cannot find input video "{}".'.format(args.input)) | |
if os.path.isfile(args.output): | |
raise IOError('Output video already exists "{}".'.format(args.output)) | |
# Matroid check (authentication and detector) | |
try: | |
ret = requests.get( | |
'{}/api/v1/detectors/{}'.format(CONSTANTS.MATROID_BASE_URL, args.detector_id), | |
headers={ 'Authorization': 'Bearer {}'.format(args.access_token) }, | |
timeout=CONSTANTS.API_TIMEOUT) | |
except requests.exceptions.Timeout: | |
logging.error('Matroid check timeout.') | |
raise | |
except Exception as e: | |
logging.error('Matroid check error.') | |
raise | |
if ret.status_code != 200: | |
raise RuntimeError('Matroid check failed with access token "{}" and detector id "{}". \ | |
HTTP status code {}.'.format(args.access_token, args.detector_id, ret.status_code)) | |
assert len(args.label) == len(args.threshold), 'Must supply the same number of labels and thresholds.' | |
labels_to_obscure = dict(zip(args.label, args.threshold)) | |
assert len(args.box_color) == 3, 'Obfuscation box color must have three numbers (BGR-format). Got {}.'.format(args.box_color) | |
assert args.window_size % 2 == 1, 'Window size must be odd. Got {}.'.format(args.window_size) | |
assert 0 < args.similarity_threshold < 1, 'Similarity threshold must be between 0 and 1. Got {}.'.format(args.similarity_threshold) | |
# Setup input and output videos | |
input_video = cv2.VideoCapture(args.input) | |
fps = input_video.get(cv2.CAP_PROP_FPS) | |
n_frames = int(input_video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frame_width = int(input_video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height = int(input_video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
try: | |
fourcc = cv2.VideoWriter_fourcc(*args.fourcc_codec) | |
output_video = cv2.VideoWriter(args.output,fourcc, fps, (frame_width, frame_height)) | |
except: | |
logging.error('Failed to create output video.', exc_info=True) | |
raise | |
# Create threads | |
sample_queue = Queue.PriorityQueue(maxsize=CONSTANTS.SAMPLE_QUEUE_SIZE) | |
classified_queue = Queue.PriorityQueue() | |
obfuscated_queue = Queue.Queue() | |
classified_queue_lock = threading.Lock() | |
sampler = Sampler(cap=input_video, sample_queue=sample_queue) | |
classifiers = [Classifier(sample_queue=sample_queue, | |
classified_queue=classified_queue, | |
lock=classified_queue_lock, | |
access_token=args.access_token, | |
detector_id=args.detector_id, | |
batch_size=CONSTANTS.CLASSIFIER_BATCH_SIZE) | |
for _ in xrange(CONSTANTS.N_CLASSIFIERS)] | |
obfuscater = Obfuscater(classified_queue=classified_queue, | |
obfuscated_queue=obfuscated_queue, | |
labels_to_obscure=labels_to_obscure, | |
box_color=args.box_color, | |
window_size=args.window_size, | |
similarity_threshold=args.similarity_threshold, | |
output_video=output_video, | |
n_frames=n_frames) | |
logging.info('Starting threads.') | |
sampler.start() | |
for c in classifiers: | |
c.start() | |
time.sleep(random.random()) | |
obfuscater.start() | |
logging.info('Starting main loop.') | |
try: | |
display_sleep = int(1000.0 / fps) | |
while True: | |
if obfuscated_queue.empty(): | |
if not sampler.isAlive() and not obfuscater.isAlive(): break | |
time.sleep(display_sleep / 1000) | |
continue | |
frame = obfuscated_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT) | |
cv2.imshow(args.input, frame) | |
if cv2.waitKey(display_sleep) == 27: | |
break | |
except Exception as e: | |
logging.error('Main loop failed', exc_info=True) | |
finally: | |
logging.info('Exiting.') | |
logging.info('Stopping threads.') | |
sampler.stop() | |
[c.stop() for c in classifiers] | |
obfuscater.stop() | |
sampler.join() | |
[c.join() for c in classifiers] | |
obfuscater.join() | |
logging.info('Releasing videos and windows.') | |
input_video.release() | |
output_video.release() | |
cv2.destroyAllWindows() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
numpy==1.16.4 | |
opencv_python==4.1.0.25 | |
scipy==1.2.2 | |
requests==2.22.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment