Skip to content

Instantly share code, notes, and snippets.

@jaltmayerpizzorno
Created December 29, 2020 19:29
Show Gist options
  • Save jaltmayerpizzorno/11833af2ef808065b0f638e6eebc764e to your computer and use it in GitHub Desktop.
Save jaltmayerpizzorno/11833af2ef808065b0f638e6eebc764e to your computer and use it in GitHub Desktop.
# Copyright 2020, Juan Altmayer Pizzorno. All Rights Reserved. MIT license applies.
import numpy as np
import cv2
import time
from pathlib import Path
# This is https://github.com/jaltmayerpizzorno/d2k
d2k_path = Path(__file__).parent / 'd2k'
import sys
sys.path.append(str(d2k_path))
import d2k.box
config = {
'camera_path': Path('/mnt/camera/samba/axis-00408CC5B6D8'),
'search_secs': 2.0, # search for detections every N seconds
'before_secs': 1.5, # extend clip this long before first detection
'after_secs': 1.5, # extend clip this long after last detection
'clip_width': 320, # clip width (height is automatic)
'clip_format': 'mp4',
'clip_codec': 'avc1',
'image_width': 640, # "representative" image width (height is automatic)
'email_subject': 'entrance camera',
'smtp_credentials': (Path(__file__).parent / 'smtp-credentials.json')
}
def parse_args():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('input_files', type=str, nargs='*', help='file(s) to process')
parser.add_argument('--send', help='send email', action='store_true')
parser.add_argument('--mark', help='mark detections', action='store_true')
parser.add_argument('--nodb', help='don\'t use axis\' index.db to find files', action='store_true')
parser.add_argument('--nordthread', help='don\'t use reading thread', action='store_true')
parser.add_argument('--tflite', help='use TF Lite (requires .tflite model)', action='store_true')
g = parser.add_mutually_exclusive_group()
g.add_argument('--yolo', help='use YOLOv3', action='store_true')
g.add_argument('--ssd', help='use MobileNetV2-SSD (implies --tflite)', action='store_true')
g.add_argument('--dummy', help='use dummy detector', action='store_true')
g = parser.add_argument_group('yolo', 'YOLO options')
g.add_argument('--gpu', help='use GPU', action='store_true')
g.add_argument('--float16', help='use float16', action='store_true')
g.add_argument('--yolo_cfg', help='YOLO configuration/weights to use', type=str,
default=str(d2k_path / 'darknet-files' / 'yolov3.cfg'))
g = parser.add_argument_group('tflite', 'TF Lite options')
g.add_argument('--tpu', help='use Coral Edge TPU', action='store_true')
args = parser.parse_args()
args.tflite = args.tflite or args.ssd
args.yolo = args.yolo or not args.ssd
if args.tflite:
if args.gpu: parser.error('--gpu invalid with --tflite')
else:
if args.tpu: parser.error('--tpu only valid for --tflite')
if args.float16 and args.ssd: parser.error('--float16 only valid for --yolo')
return args
# FIXME load only when needed; refactor into class
coco_class_names = (d2k_path / 'darknet-files' / 'coco.names').read_text().splitlines()
# Limit detections to classes we are interested in... this cuts down on false positives
interesting_classes = {coco_class_names.index(c) for c in ['person', 'bicycle', 'car', 'motorbike',
'bus', 'truck', 'cat', 'dog']}
def is_interesting(box):
"""Returns whether the given YOLO box has an interesting detection."""
return any([(box.classes[i] > 0) for i in interesting_classes])
class DividingLine:
"""Implements a linear detection boundary, scaled to the given image dimensions (w,h)."""
def __init__(self, img_dim):
line_points = np.array([[0, 320], [1920, 820]]) # (x,y) measured on image
orig_img_size = np.array([1920, 1080]) # that image's (w, h)
line_points = line_points / orig_img_size
x = line_points[:,0]
y = line_points[:,1]
a = (y[1] - y[0]) / (x[1] - x[0])
b = y[0] - a * x[0]
self.a = a * img_dim[1]/img_dim[0]
self.b = b * img_dim[1]
def is_above(self, box):
"""Returns whether the bottom center of a box lies below a line"""
return (box.y + box.h/2) > (self.a * box.x + self.b) # note y starts at top
def bgr2rgb(image):
"""Converts an image from BGR to RGB"""
return image[...,::-1] # there's also cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
def image_dim(image):
"""Given a HxWxC sized image array, returns (W,H) as an array"""
return np.array(image.shape[1::-1])
class TfEngine:
"""Engine wrapper for TensorFlow"""
def __init__(self, model):
self.model = model
def input_dim(self):
return self.model.layers[0].input_shape[0][2:0:-1]
def predict(self, image):
output = self.model.predict(np.expand_dims(image, axis=0))
return [np.squeeze(o) for o in output]
class TfliteEngine:
"""Engine wrapper for TensorFlow Lite"""
def __init__(self, model_path, use_tpu=False):
if use_tpu:
import tflite_runtime.interpreter as tflite
import platform
EDGETPU_SHARED_LIB = {
'Linux': 'libedgetpu.so.1',
'Darwin': 'libedgetpu.1.dylib'
} [platform.system()]
model_path = model_path.parent / (model_path.stem + '_edgetpu' + model_path.suffix)
if not model_path.exists():
raise FileNotFoundError('Model file ' + str(model_path) + ' doesn\'t exist')
self.interpreter = tflite.Interpreter(str(model_path),
experimental_delegates=[
tflite.load_delegate(EDGETPU_SHARED_LIB, {})])
else:
import tensorflow.lite as tflite
self.interpreter = tflite.Interpreter(str(model_path))
self.interpreter.allocate_tensors()
def input_dim(self):
return self.interpreter.get_input_details()[0]['shape'][2:0:-1]
def _copy_input(self, image):
tensor = self.interpreter.tensor(self.interpreter.get_input_details()[0]['index'])()[0]
tensor.fill(0) # padding, in case image isn't as big as input
tensor[:image.shape[0], :image.shape[1]] = image
def predict(self, image):
self._copy_input(image)
self.interpreter.invoke()
return [np.squeeze(self.interpreter.tensor(o['index'])()) \
for o in self.interpreter.get_output_details()]
class YoloDetector:
"""Detects objects using YOLOv3"""
def __init__(self, engine):
self.engine = engine
self.net_dim = engine.input_dim()
def detect(self, frame):
"""Detects objects on a CV2 frame, returning a list of d2k compatible boxes."""
import d2k
image = bgr2rgb(frame).astype(np.float32) / 255.0
image = d2k.image.letterbox(image, *self.net_dim)
output = self.engine.predict(image)
return d2k.network.boxes_from_output(output, self.net_dim, image_dim(frame), thresh=.8)
@staticmethod
def model_file(path):
return path / 'yolov3.tflite'
class MobilenetSsdDetector:
"""Detects objects using MobileNetv2-SSD"""
def __init__(self, engine):
self.engine = engine
self.net_dim = engine.input_dim()
def detect(self, frame):
"""Detects objects on a CV2 frame, returning a list of d2k compatible boxes."""
img_dim = image_dim(frame)
letterbox_scale = min(self.net_dim / img_dim)
image = bgr2rgb(cv2.resize(frame, tuple((img_dim * letterbox_scale).astype(int))))
ssd_boxes, class_ids, scores, count = self.engine.predict(image)
scale_adjust = self.net_dim / letterbox_scale
def make_box(i):
ymin, xmin, ymax, xmax = ssd_boxes[i]
w, h = (xmax - xmin, ymax - ymin)
x, y = (xmin + w/2, ymin + h/2)
x, y = (x, y) * scale_adjust
w, h = (w, h) * scale_adjust
return d2k.box.Box(x, y, w, h, scores[i],
[scores[i] if class_ids[i] == j else 0. for j in range(len(coco_class_names))])
return [make_box(i) for i in range(int(count)) if scores[i] >= .5]
@staticmethod
def model_file(path):
return path / 'ssd_mobilenet_v2_coco_quant_postprocess.tflite'
class DummyDetector:
"""Dummy detector to facilitate measuring non-detection overhead"""
def __init__(self):
pass
def detect(self, frame):
return []
class FrameDetections:
"""Holds a frame and its detections."""
def __init__(self, frame, boxes):
self.frame = frame
self.boxes = boxes
self.area = sum([b.w * b.h for b in boxes])
def classes(self):
"""Returns the set of classes in these detections."""
return {coco_class_names[i] for b in self.boxes for i, c in enumerate(b.classes) if c > 0.}
def __lt__(self, other):
"""Defines how we'd like our detections sorted"""
assert isinstance(other, FrameDetections)
return (len(self.boxes), self.area) < (len(other.boxes), other.area)
def write_jpg(self, filename, x_dim, mark=False):
"""Writes these detections' frame as a JPG, resizing and optionally marking the boxes"""
from PIL import Image
image = Image.fromarray(bgr2rgb(self.frame))
if mark:
d2k.box.draw_boxes(image, self.boxes, names=coco_class_names)
image = image.resize((x_dim, int(image.height/image.width*x_dim)))
image.save(filename, "jpeg")
class VideoReader:
"""Reads from a video file; The end of the video is indicated by a 'None' frame."""
def __init__(self, file):
self.cap = cv2.VideoCapture(str(file))
if not self.cap.isOpened():
raise FileNotFoundError
self.pos = 0
def __del__(self):
self.cap.release()
def _fill(self):
"""Internal function to fill the queue"""
while True:
ret, frame = self.cap.read()
if not ret: break
self.q.put(frame)
self.q.put(None) # end marker
def read(self):
ret, frame = self.cap.read()
if not ret: return None
self.pos += 1
return frame
def position(self):
return self.pos
def get(self, propName):
"""Returns a property of the underlying video capture stream"""
return self.cap.get(propName)
class ThreadedVideoReader(VideoReader):
"""Reads from a video file by wrapping an OpenCV VideoCapture object and reading
from it in another thread. The end of the video is indicated by a 'None' frame."""
def __init__(self, file):
import queue
import threading
super().__init__(file)
self.q = queue.SimpleQueue()
threading.Thread(target=self._fill, daemon=True).start()
def _fill(self):
"""Internal function to fill the queue"""
while True:
ret, frame = self.cap.read()
if not ret: break
self.q.put(frame)
self.q.put(None) # end marker
def read(self):
"""Reads a frame; returns None at the end of the file"""
if self.q:
frame = self.q.get()
if frame is None:
self.q = None
else:
self.pos += 1
return frame
def processFile(detector, file):
"""Processes a video file, returning a tuple (names, clips) of a list of the detected class names
and a list of the generated clips/images"""
print("processing", file)
start_time = time.time()
video = VideoReader(file) if args.nordthread else ThreadedVideoReader(file)
video_fps = video.get(cv2.CAP_PROP_FPS)
video_dims = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)))
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
clip_dims = (config['clip_width'], int(video_dims[1]/video_dims[0]*config['clip_width']))
# search for detections every N frames
search_frames = int(config['search_secs'] * video_fps)
clips_list = []
clip_prefix = '_'.join(str(file.name).split('_')[:2])
all_detected_classes = set()
memory_secs = 20
assert memory_secs >= config['before_secs']
assert memory_secs >= config['after_secs']
MEM_SIZE = int(memory_secs*video_fps)
memory = [None] * MEM_SIZE
dividing_line = DividingLine(video_dims) # below the line it's "in", otherwise "out"
def detect(frame):
boxes = d2k.box.nms_boxes(detector.detect(frame), iou_thresh=.5)
for b in [b for b in boxes if not dividing_line.is_above(b)]:
print({coco_class_names[i] for i, c in enumerate(b.classes) if c > 0.}, 'ignored')
return FrameDetections(frame, [b for b in boxes if is_interesting(b) and dividing_line.is_above(b)])
while True:
frame = video.read()
if frame is None: break
if sys.stdout.isatty():
print(f"{video.position()/total_frames*100:.1f}%", end='\r')
memory[(video.position()-1) % MEM_SIZE] = frame
if (video.position() % search_frames) == 0:
detection = detect(frame)
if not detection.boxes: continue
detected_classes = detection.classes()
top_detection = detection
clip_name = Path(f"{clip_prefix}-{len(clips_list)+1}.{config['clip_format']}")
print(f"writing {clip_name}", detected_classes)
out = cv2.VideoWriter(str(clip_name), cv2.VideoWriter_fourcc(*config['clip_codec']),
video_fps, clip_dims)
for i in range(max(0, video.position() - int(config['before_secs'] * video_fps)), video.position()):
out.write(cv2.resize(memory[i % MEM_SIZE], clip_dims))
next_write = video.position() # next frame to write, one after last detection
while (video.position() - next_write) < MEM_SIZE:
frame = video.read()
if frame is None: break
if sys.stdout.isatty():
print(f"{video.position()/total_frames*100:.1f}%", end='\r')
memory[(video.position()-1) % MEM_SIZE] = frame
if (video.position() % search_frames) == 0:
detection = detect(frame)
if detection.boxes:
detected_classes |= detection.classes()
if (detection > top_detection):
top_detection = detection
for i in range(next_write, video.position()):
out.write(cv2.resize(memory[i % MEM_SIZE], clip_dims))
next_write = video.position()
for i in range(next_write, min(next_write + int(config['after_secs'] * video_fps), video.position())):
out.write(cv2.resize(memory[i % MEM_SIZE], clip_dims))
out.release()
new_clip_name = str(clip_name.parent / (clip_name.stem + '-' + '-'.join(sorted(detected_classes)) \
+ clip_name.suffix))
clip_name.rename(new_clip_name)
all_detected_classes |= detected_classes
image_name = clip_name.stem + ".jpg"
top_detection.write_jpg(image_name, config['image_width'], mark=args.mark)
clips_list.append((new_clip_name, image_name))
end_time = time.time()
print(f"elapsed: {end_time - start_time:.1f}s -- {total_frames/(end_time - start_time):.1f} FPS")
return (all_detected_classes, clips_list)
def sendEmail(detections, clips):
"""Sends (submits) and email to notify of the detections"""
import smtplib
import email
import email.mime.base
import email.mime.text
import email.mime.multipart
import json
mime_type = {'.mp4': ('video', 'mp4'),
'.avi': ('video', 'x-msvideo'), # or x-motion-jpeg
'.mkv': ('video', 'x-matroska'),
'.jpg': ('image', 'jpeg'),
'.png': ('image', 'png')
}
credentials = json.loads(config['smtp_credentials'].read_text())
msg_from = credentials['user'] # assumes 'user' is a valid email address!
msg = email.mime.multipart.MIMEMultipart()
msg['To'] = credentials['email_to']
msg['From'] = msg_from
msg['Subject'] = config['email_subject']
html = ['<!doctype html><html><body>']
html.append(', '.join(sorted(detections)) + ' detected')
html.append('<br/>')
for video, image in clips:
video_id = video + '.' + msg_from
image_id = image + '.' + msg_from
video_type = '/'.join(mime_type[Path(video).suffix])
html.append(f'<video controls poster="cid:{image_id}">')
html.append(f'<source src="cid:{video_id}" type="{video_type}">')
html.append(f'<img src="cid:{image_id}"/>')
html.append('</video>')
html.append('</body></html>')
msg.attach(email.mime.text.MIMEText(''.join(html), 'html'))
for att in [Path(file) for iter_ in clips for file in iter_]:
part = email.mime.base.MIMEBase(*mime_type[att.suffix])
part.add_header('Content-ID', f'<{att}.{msg_from}>')
part.add_header('Content-Disposition', 'inline', filename=att.name)
part.set_payload(att.read_bytes())
email.encoders.encode_base64(part)
msg.attach(part)
server = smtplib.SMTP_SSL(credentials['server'])
try:
# server.set_debuglevel(1)
server.login(credentials['user'], credentials['password'])
server.sendmail(msg_from, msg['To'], msg.as_string())
except Exception as e:
print(e)
finally:
server.quit()
class FileLister:
"""Looks for files to process saved by an Axis Q6035-E camera"""
def __init__(self, camera_path, nodb=False):
self._db = None
self._path = camera_path
self._nodb = nodb
def files_since(self, timestamp):
if self._nodb:
files = list(self._path.rglob('*.mkv'))[:-1] # the last one may not be ready yet
files = [f for f in files if f.name[:len(last_timestamp)] > last_timestamp]
if len(files) > 1:
print(len(files), "from glob")
return files
import sqlite3
if not self._db:
self._db = sqlite3.connect('file:' + str(self._path / 'index.db') + '?mode=ro', uri=True)
try:
cursor = self._db.cursor()
cursor.execute(
'select r.path, r.filename, b.path, b.filename from blocks as b' +
' join recordings as r on recording_id=r.id' +
f' where substr(b.filename,1,{len(last_timestamp)}) > "{last_timestamp}"' +
' and b.stoptime not null')
files = cursor.fetchall()
files = [self._path / ('/'.join(row) + '.mkv') for row in files]
if len(files) > 1: print(len(files), "from db")
return files
except sqlite3.Error as e:
print("sqlite3 error:", e)
self._db.close(); self._db = None
return []
def wait_for_more(self):
if self._nodb:
time.sleep(60)
else:
time.sleep(3) # if I only had an update hook...
if __name__ == '__main__':
args = parse_args()
if args.dummy:
detector = DummyDetector()
elif args.tflite:
tfl_path = Path(__file__).parent / 'tflite'
if args.ssd:
detector = MobilenetSsdDetector(TfliteEngine(MobilenetSsdDetector.model_file(tfl_path),
use_tpu=args.tpu))
else:
detector = YoloDetector(TfliteEngine(YoloDetector.model_file(tfl_path), use_tpu=args.tpu))
else:
import tensorflow as tf
if not args.gpu:
tf.config.experimental.set_visible_devices([], 'GPU')
else:
gpu = tf.config.experimental.list_physical_devices('GPU')
if tf.version.VERSION[:4] == '2.2.':
tf.config.experimental.set_memory_growth(gpu[0], True) # works around bug in TF 2.2
if args.float16:
tf.keras.backend.set_floatx('float16')
import d2k
yolo_cfg = Path(args.yolo_cfg)
network = d2k.network.load(yolo_cfg.read_text())
network.read_darknet_weights((yolo_cfg.parent / (yolo_cfg.stem + '.weights')).read_bytes())
detector = YoloDetector(TfEngine(network.make_model()))
if len(args.input_files) > 0:
# process individual files, mostly for manual testing
for f in args.input_files:
detections, clips = processFile(detector, Path(f))
if args.send and len(detections) > 0:
sendEmail(detections, clips)
else:
# keep processing files as they become available
fl = FileLister(config['camera_path'], nodb=args.nodb)
checkpoint = Path('./checkpoint')
while True:
last_timestamp = checkpoint.read_text() if checkpoint.exists() else '20200101_000000'
for f in fl.files_since(last_timestamp):
detections, clips = processFile(detector, f)
if len(detections) > 0:
sendEmail(detections, clips)
checkpoint.write_text(f.name[:len(last_timestamp)])
if sys.stdout.isatty():
import datetime
print(datetime.datetime.now().strftime("%H:%M:%S..."), end='\r')
else:
sys.stdout.flush()
fl.wait_for_more()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment