Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Script for using Matroid to obfuscate desired objects within an input video.
'''
Script for using Matroid to obfuscate desired objects within an input video.
Example usage:
To obfuscate all "male" and "female" faces with a score above 0.5 in
"my_original_video.mp4" using the Matroid gender detector (5a4d4eb907a4ed0013eaa6f6).
Obfuscated video "my_obfuscated_video.mp4" is written and displayed in real-time.
python obfuscation.py \
--input my_original_video.mp4 \
--output my_obfuscated_video.mp4 \
--access_token ae30cdbe4f80b56c652af2e5d8e69023 \
--detector_id 5a4d4eb907a4ed0013eaa6f6 \
--label male \
--threshold 0.5 \
--label female \
--threshold 0.5
Note: Hit ESC to stop script (with obfsucated video as active window).
'''
import cv2
import requests
import logging
import threading
import Queue
import time
import random
import argparse
import os
from collections import defaultdict, deque
import numpy as np
from scipy.spatial.distance import pdist, squareform
import string
class CONSTANTS:
MATROID_BASE_URL = 'https://www.matroid.com'
THREAD_ID_LEN = 8
SAMPLE_QUEUE_SIZE = 256
API_TIMEOUT = 5
QUEUE_TIMEOUT = 1
SAMPLER_SLEEP = 1
CLASSIFIER_SLEEP = 1
OBFUSCATER_SLEEP = 1
N_CLASSIFIERS = 16
CLASSIFIER_BATCH_SIZE = 4
N_SUCCESSES_FOR_BATCH_INCREASE = 20
class Sampler(threading.Thread):
''' Samples frames from input video. '''
def __init__(self, cap, sample_queue):
threading.Thread.__init__(self)
unique_id = ''.join(random.choice(string.hexdigits) for _ in range(CONSTANTS.THREAD_ID_LEN))
self.setName('Sampler-{}'.format(unique_id))
self.cap = cap
self.sample_queue = sample_queue
self.stop_event = threading.Event()
def run(self):
logging.info('{} started.'.format(self.getName()))
i = 0
while not self.stop_event.isSet():
if self.sample_queue.full():
time.sleep(CONSTANTS.SAMPLER_SLEEP)
continue
ret, frame = self.cap.read()
if not ret:
logging.info('{} done.'.format(self.getName()))
break
self.sample_queue.put((i, frame))
i += 1
def stop(self):
self.stop_event.set()
class Classifier(threading.Thread):
''' Classifies sampled frames via Matroid API. '''
def __init__(self, sample_queue, classified_queue, lock, access_token, detector_id, batch_size):
threading.Thread.__init__(self)
unique_id = ''.join(random.choice(string.hexdigits) for _ in range(CONSTANTS.THREAD_ID_LEN))
self.setName('Classifier-{}'.format(unique_id))
self.sample_queue = sample_queue
self.temp_queue = Queue.Queue()
self.classified_queue = classified_queue
self.lock = lock
self.access_token = access_token
self.detector_id = detector_id
self.batch_size = batch_size
self.stop_event = threading.Event()
def classify(self, images):
''' Classifies a list of jpeg @images using the Matroid API. '''
try:
files = [('file', (str(i),img)) for i, img in enumerate(images)]
ret = requests.post(
'{}/api/v1/detectors/{}/classify_image'.format(CONSTANTS.MATROID_BASE_URL, self.detector_id),
headers={ 'Authorization': 'Bearer {}'.format(self.access_token) },
files= files,
timeout=CONSTANTS.API_TIMEOUT)
results = ret.json()['results']
return results
except requests.exceptions.Timeout:
logging.error('{}: API classification timeout.'.format(self.getName()))
return None
except Exception as e:
logging.error('{}: API classification error.'.format(self.getName()), exc_info=True)
return None
def run(self):
logging.info('{} started.'.format(self.getName()))
n_successes = 0
while not self.stop_event.isSet():
if self.sample_queue.empty() and self.temp_queue.empty():
if not self.is_sampler_alive():
logging.info('{} done.'.format(self.getName()))
break
else:
time.sleep(random.random()*CONSTANTS.CLASSIFIER_SLEEP)
continue
batch_size = min(self.sample_queue.qsize() + self.temp_queue.qsize(), self.batch_size)
frame_i, frames, jpgs = [], [], []
with self.lock:
for _ in xrange(batch_size):
try:
if not self.temp_queue.empty():
i, frame = self.temp_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT)
else:
i, frame = self.sample_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT)
except Queue.Empty:
logging.info('{}: Timeout getting next frame for classifier.'.format(self.getName()))
continue
frame_i.append(i)
frames.append(frame)
_, jpg = cv2.imencode('.jpg', frame)
jpgs.append(jpg)
if len(jpgs) == 0: continue
results = self.classify(jpgs)
if results is None:
for i,frame in zip(frame_i, frames):
self.temp_queue.put((i, frame))
if self.batch_size != 1:
self.batch_size = max(self.batch_size / 2, 1)
logging.info('{}: batch size decreased to {}.'.format(self.getName(), self.batch_size))
n_successes = 0
time.sleep(random.random()*CONSTANTS.CLASSIFIER_SLEEP)
else:
for i, f, r in zip(frame_i, frames, results):
self.classified_queue.put((i, (f, r)))
n_successes += 1
if n_successes >= CONSTANTS.N_SUCCESSES_FOR_BATCH_INCREASE and self.batch_size < CONSTANTS.CLASSIFIER_BATCH_SIZE:
self.batch_size = min(CONSTANTS.CLASSIFIER_BATCH_SIZE, self.batch_size*2)
logging.info('{}: batch size increased to {}.'.format(self.getName(), self.batch_size))
n_successes = 0
time.sleep(random.random()*CONSTANTS.CLASSIFIER_SLEEP)
def is_sampler_alive(self):
sampler = filter(lambda x: x.getName().startswith('Sampler-'), threading.enumerate())
return len(sampler) != 0
def stop(self):
self.stop_event.set()
class Obfuscater(threading.Thread):
''' Obfuscates objects in frames (based on classification results) and writes video. '''
def __init__(self, classified_queue, obfuscated_queue, labels_to_obscure, box_color, window_size, similarity_threshold, output_video, n_frames):
threading.Thread.__init__(self)
unique_id = ''.join(random.choice(string.hexdigits) for _ in range(CONSTANTS.THREAD_ID_LEN))
self.setName('Obfuscater-{}'.format(unique_id))
self.classified_queue = classified_queue
self.obfuscated_queue = obfuscated_queue
self.labels_to_obscure = labels_to_obscure
self.box_color = box_color
self.window_size = window_size
self.similarity_threshold = similarity_threshold
self.output_video = output_video
self.n_frames = n_frames
self.stop_event = threading.Event()
def run(self):
logging.info('{} started.'.format(self.getName()))
next_i, t_empty = 0, 0
window = deque([None]*(self.window_size / 2))
i_ctr = self.window_size / 2
has_padded_for_last_frames = False
while not self.stop_event.isSet():
if self.classified_queue.empty():
if not self.is_any_classifier_alive():
if has_padded_for_last_frames:
logging.info('Processing complete.')
logging.info('{} done.'.format(self.getName()))
break
else:
# Add padding to process frames in second half of window
for i in xrange(next_i, next_i + self.window_size / 2):
self.classified_queue.put((i, (None, None)))
has_padded_for_last_frames = True
else:
time.sleep(CONSTANTS.OBFUSCATER_SLEEP)
t_empty += 1
continue
i, (frame, results) = self.classified_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT)
if i != next_i:
self.classified_queue.put((i, (frame, results)))
time.sleep(CONSTANTS.OBFUSCATER_SLEEP)
continue
next_i += 1
t_empty = 0
window.append((frame, results))
if len(window) < self.window_size:
time.sleep(CONSTANTS.OBFUSCATER_SLEEP)
continue
elif len(window) > self.window_size:
window.popleft()
window_results = map(lambda x: x[1] if x is not None else None, window)
smoothed_results = self.smooth(window_results, self.similarity_threshold)
frame_ctr = window[i_ctr][0]
self.obfuscate(frame_ctr, smoothed_results, self.labels_to_obscure, self.box_color)
self.obfuscated_queue.put(frame_ctr)
self.output_video.write(frame_ctr)
if next_i % 10 == 0:
percent_processed = next_i * 100.0 / self.n_frames
logging.info('Processed {} frames ({:.2f}%)'.format(next_i, percent_processed))
def smooth(self, results, similarity_threshold=0.9):
''' Returns smoothed classification results for middle element within list @results. '''
assert len(results) % 2 == 1, 'results must have an odd number of elements. Has {}.'.format(len(results))
non_padded_results = filter(lambda x: x is not None, results)
n_results = len(non_padded_results)
# Group predictions by label
predictions = [pred for r in non_padded_results for pred in r.get('predictions', [])]
preds_by_label = defaultdict(list)
for pred in predictions:
label, score = max(pred['labels'].items(), key=lambda x: x[1])
if label not in self.labels_to_obscure:
continue
preds_by_label[label].append(pred)
# Group predictions by object using similarity metric
preds_by_objects = []
for label, preds in preds_by_label.iteritems():
X = np.array([[p['bbox']['top'], p['bbox']['left'],
p['bbox']['height'], p['bbox']['width']] for p in preds])
normalized_dist = squareform(pdist(X, 'euclidean')) / 2.0
similarity = 1 - normalized_dist
n_rows = len(preds)
groups, pairings = {}, {}
for i in xrange(n_rows):
if i not in pairings:
groups[i] = [preds[i]]
for j in xrange(i+1, n_rows):
if similarity[i][j] > similarity_threshold:
p = pairings.get(i, i)
groups[p].append(preds[j])
pairings[j] = p
preds_by_objects.extend(groups.values())
# Average score and bbox for each object
smoothed_preds = []
for preds in preds_by_objects:
n_preds = len(preds)
smoothed_scores = {l: sum(p['labels'][l] for p in preds) / n_results
for l in self.labels_to_obscure}
smoothed_bbox = {b: sum(p['bbox'][b] for p in preds) / n_preds
for b in ['left', 'top', 'width', 'height']}
smoothed_preds.append({'labels': smoothed_scores, 'bbox': smoothed_bbox})
smoothed_results = {'predictions': smoothed_preds}
return smoothed_results
def obfuscate(self, frame, results, labels, color=(0,0,0)):
for pred in results.get('predictions', []):
label, score = max(pred['labels'].items(), key=lambda x: x[1])
if score < labels.get(label, 1.0): continue
# Find absolute box coordinates
height, width, _ = frame.shape
bbox = pred['bbox']
ymin = int(bbox['top'] * height)
xmin = int(bbox['left'] * width)
ymax = int((bbox['top'] + bbox['height']) * height)
xmax = int((bbox['left'] + bbox['width']) * width)
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), color, thickness=cv2.FILLED)
def is_any_classifier_alive(self):
classifiers = filter(lambda x: x.getName().startswith('Classifier-'), threading.enumerate())
return len(classifiers) != 0
def stop(self):
self.stop_event.set()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', help='input video to obscure', type=str, required=True)
parser.add_argument('-o', '--output', help='output video', type=str, required=True)
parser.add_argument('-c', '--fourcc_codec', help='output video codec. Refer to fourcc.org/codecs.php.', type=str, default='mp4v')
parser.add_argument('-a', '--access_token', help='Matroid access token', type=str, required=True)
parser.add_argument('-d', '--detector_id', help='Matroid detector id', type=str, required=True)
parser.add_argument('-l', '--label', help='label for obfuscation', action='append', type=str)
parser.add_argument('-t', '--threshold', help='probability threshold for obfuscation', action='append', type=float)
parser.add_argument('-b', '--box_color', help='color of obfuscation boxes', nargs='+', type=int, default=[0, 0, 0])
parser.add_argument('-w', '--window_size', help='number of frames in window for smoothing. Must be odd. \
Recommend lower values for videos with fast moving objects (e.g. 5) and higher values \
for videos with slower moving objects (e.g. 11).', type=int, default=11)
parser.add_argument('-s', '--similarity_threshold', help='similarity threshold (0 to 1) for identifying \
bboxes that belong to the same object.', type=float, default=0.95)
args = parser.parse_args()
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%H:%M:%S', level=logging.INFO)
# Validate input / output videos
if not os.path.isfile(args.input):
raise IOError('Cannot find input video "{}".'.format(args.input))
if os.path.isfile(args.output):
raise IOError('Output video already exists "{}".'.format(args.output))
# Matroid check (authentication and detector)
try:
ret = requests.get(
'{}/api/v1/detectors/{}'.format(CONSTANTS.MATROID_BASE_URL, args.detector_id),
headers={ 'Authorization': 'Bearer {}'.format(args.access_token) },
timeout=CONSTANTS.API_TIMEOUT)
except requests.exceptions.Timeout:
logging.error('Matroid check timeout.')
raise
except Exception as e:
logging.error('Matroid check error.')
raise
if ret.status_code != 200:
raise RuntimeError('Matroid check failed with access token "{}" and detector id "{}". \
HTTP status code {}.'.format(args.access_token, args.detector_id, ret.status_code))
assert len(args.label) == len(args.threshold), 'Must supply the same number of labels and thresholds.'
labels_to_obscure = dict(zip(args.label, args.threshold))
assert len(args.box_color) == 3, 'Obfuscation box color must have three numbers (BGR-format). Got {}.'.format(args.box_color)
assert args.window_size % 2 == 1, 'Window size must be odd. Got {}.'.format(args.window_size)
assert 0 < args.similarity_threshold < 1, 'Similarity threshold must be between 0 and 1. Got {}.'.format(args.similarity_threshold)
# Setup input and output videos
input_video = cv2.VideoCapture(args.input)
fps = input_video.get(cv2.CAP_PROP_FPS)
n_frames = int(input_video.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(input_video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(input_video.get(cv2.CAP_PROP_FRAME_HEIGHT))
try:
fourcc = cv2.VideoWriter_fourcc(*args.fourcc_codec)
output_video = cv2.VideoWriter(args.output,fourcc, fps, (frame_width, frame_height))
except:
logging.error('Failed to create output video.', exc_info=True)
raise
# Create threads
sample_queue = Queue.PriorityQueue(maxsize=CONSTANTS.SAMPLE_QUEUE_SIZE)
classified_queue = Queue.PriorityQueue()
obfuscated_queue = Queue.Queue()
classified_queue_lock = threading.Lock()
sampler = Sampler(cap=input_video, sample_queue=sample_queue)
classifiers = [Classifier(sample_queue=sample_queue,
classified_queue=classified_queue,
lock=classified_queue_lock,
access_token=args.access_token,
detector_id=args.detector_id,
batch_size=CONSTANTS.CLASSIFIER_BATCH_SIZE)
for _ in xrange(CONSTANTS.N_CLASSIFIERS)]
obfuscater = Obfuscater(classified_queue=classified_queue,
obfuscated_queue=obfuscated_queue,
labels_to_obscure=labels_to_obscure,
box_color=args.box_color,
window_size=args.window_size,
similarity_threshold=args.similarity_threshold,
output_video=output_video,
n_frames=n_frames)
logging.info('Starting threads.')
sampler.start()
for c in classifiers:
c.start()
time.sleep(random.random())
obfuscater.start()
logging.info('Starting main loop.')
try:
display_sleep = int(1000.0 / fps)
while True:
if obfuscated_queue.empty():
if not sampler.isAlive() and not obfuscater.isAlive(): break
time.sleep(display_sleep / 1000)
continue
frame = obfuscated_queue.get(timeout=CONSTANTS.QUEUE_TIMEOUT)
cv2.imshow(args.input, frame)
if cv2.waitKey(display_sleep) == 27:
break
except Exception as e:
logging.error('Main loop failed', exc_info=True)
finally:
logging.info('Exiting.')
logging.info('Stopping threads.')
sampler.stop()
[c.stop() for c in classifiers]
obfuscater.stop()
sampler.join()
[c.join() for c in classifiers]
obfuscater.join()
logging.info('Releasing videos and windows.')
input_video.release()
output_video.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
numpy==1.16.4
opencv_python==4.1.0.25
scipy==1.2.2
requests==2.22.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment