Skip to content

Instantly share code, notes, and snippets.

@jaltmayerpizzorno
Created September 10, 2020 18:55
Show Gist options
  • Save jaltmayerpizzorno/935b021acdb3b562031f374b7deddfea to your computer and use it in GitHub Desktop.
Save jaltmayerpizzorno/935b021acdb3b562031f374b7deddfea to your computer and use it in GitHub Desktop.
# Copyright 2020, Juan Altmayer Pizzorno. All Rights Reserved. MIT license applies.
import os
import argparse
import tensorflow as tf
import tensorflow.keras as keras
import numpy as np
import cv2
import time
import sys
from pathlib import Path
# This is https://github.com/jaltmayerpizzorno/d2k
d2k_path = Path(__file__).parent / 'd2k'
sys.path.append(str(d2k_path))
import d2k
config = {
'camera_path': Path('/mnt/camera/samba/axis-00408CC5B6D8'),
'search_secs': 2.0, # search for detections every N seconds
'before_secs': 1.5, # extend clip this long before first detection
'after_secs': 1.5, # extend clip this long after last detection
'clip_width': 320, # clip width (height is automatic)
'clip_format': 'mp4',
'clip_codec': 'avc1',
'image_width': 640, # clip "representative" image width (height is automatic)
'email_subject': 'entrance camera',
'smtp_credentials': (Path(__file__).parent / 'smtp-credentials.json')
}
argparser = argparse.ArgumentParser()
argparser.add_argument('input_files', type=str, nargs='*', help='file(s) to process')
argparser.add_argument('--send', help='send email', action='store_true')
argparser.add_argument('--mark', help='mark detections', action='store_true')
argparser.add_argument('--nodb', help='don\'t use axis\' index.db to find files', action='store_true')
argparser.add_argument('--gpu', help='use GPU', action='store_true')
argparser.add_argument('--float16', help='use float16', action='store_true')
args = argparser.parse_args()
if not args.gpu:
tf.config.experimental.set_visible_devices([], 'GPU')
else:
gpu = tf.config.experimental.list_physical_devices('GPU')
if tf.version.VERSION[:4] == '2.2.':
tf.config.experimental.set_memory_growth(gpu[0], True) # works around bug in TF 2.2
def make_model():
if args.float16:
keras.backend.set_floatx('float16')
network = d2k.network.load((d2k_path / 'darknet-files' / 'yolov3.cfg').read_text())
network.read_darknet_weights((d2k_path / 'darknet-files' / 'yolov3.weights').read_bytes())
return network.make_model()
coco_class_names = (d2k_path / 'darknet-files' / 'coco.names').read_text().splitlines()
# Limit detections to classes we are interested in... this cuts down on false positives
interesting_classes = {coco_class_names.index(c) for c in ['person', 'bicycle', 'car', 'motorbike',
'bus', 'truck', 'cat', 'dog']}
def is_interesting(box):
"""Returns whether the given YOLO box has an interesting detection."""
return sum([(box.classes[i] > 0) for i in interesting_classes]) > 0
def dividing_line(img_dim):
"""Returns the (a,b) parameters for our detection boundary, scaled to the given image dimensions (w,h)."""
line_points = np.array([[0, 270], [1920, 740]]) # (x,y) measured on image
orig_img_size = np.array([1920, 1080]) # that image's (w, h)
line_points = line_points / orig_img_size
x = line_points[:,0]
y = line_points[:,1]
a = (y[1] - y[0]) / (x[1] - x[0])
b = y[0] - a * x[0]
return (a * img_dim[1]/img_dim[0], b * img_dim[1])
def is_below(line, box):
"""Returns whether the bottom center of a box lies below a line"""
x = box.x
y = box.y + box.h/2 # bottom center of box
return y > (line[0] * x + line[1]) # note y starts at top
def pipeline_detect(model, frames):
"""Detects objects on the given frames, returning a list of (lists of) detection boxes."""
img_dim = frames[0].shape[1::-1]
line = dividing_line(img_dim) # detection boundary
net_dim = model.layers[0].input_shape[0][2:0:-1]
def output2boxes(output):
boxes = d2k.network.boxes_from_output(output, net_dim, img_dim, thresh=.8)
boxes = d2k.box.nms_boxes(boxes, iou_thresh=.5)
# filter out uninteresting or out-of-bounds boxes
for b in [b for b in boxes if not is_below(line, b)]:
print({coco_class_names[i] for i, c in enumerate(b.classes) if c > 0.}, 'ignored')
boxes = [b for b in boxes if is_interesting(b) and is_below(line, b)]
return boxes
def pipeline():
for img in frames:
img = img[...,::-1].astype('float32') / 255.0
img = d2k.image.letterbox(img, *net_dim)
img = np.reshape(img, (1, *img.shape))
yield img
input = tf.data.Dataset.from_generator(pipeline, (keras.backend.floatx()),
tf.TensorShape(model.layers[0].input_shape[0]))
input = input.prefetch(tf.data.experimental.AUTOTUNE)
output = model.predict(input)
return [output2boxes([out[o_i] for out in output]) for o_i in range(len(output[0]))]
def boxes_names(boxes):
"""Returns the class names for the object(s) detected in the given boxes"""
return {coco_class_names[i] for b in boxes for i, c in enumerate(b.classes) if c > 0.}
def boxes_area(boxes):
"""Returns the sum of the given boxes' areas"""
return sum([b.w * b.h for b in boxes])
def save_jpg(top_detection, image_name, x_dim):
"""Save a frame as a JPG, resizing and optionally marking the detections"""
top_frame, top_boxes = top_detection
image = keras.preprocessing.image.array_to_img(top_frame[...,::-1].astype(np.float32) / 255.0,
data_format='channels_last', dtype='float32')
if args.mark:
d2k.box.draw_boxes(image, top_boxes, names=coco_class_names)
image = image.resize((x_dim, int(image.height/image.width*x_dim)))
image.save(image_name, "jpeg")
return image_name
def true_seq(iterable):
"""Iterator returning (start, end) ranges for sequences of True values in a boolean valued iterable"""
start = None
for i, value in enumerate(iterable):
if start:
if value != True:
yield (start, i)
start = None
else:
if value == True:
start = i
if start: yield (start, len(iterable))
def processFile(model, file):
"""Processes a video file, returning a tuple (names, clips) of a list of the detected class names
and a list of the generated clips/images"""
print("processing", file)
start_time = time.time()
cap = cv2.VideoCapture(str(file))
if not cap.isOpened():
return set(), list()
video_fps = cap.get(cv2.CAP_PROP_FPS)
video_dims = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
clip_dims = (config['clip_width'], int(video_dims[1]/video_dims[0]*config['clip_width']))
# search for detections every N frames
search_frames = int(config['search_secs'] * video_fps)
clips_list = []
clip_prefix = '_'.join(str(file.name).split('_')[:2])
all_detections = set()
memory = []
while cap.isOpened():
ret, frame = cap.read()
if ret != True: break
memory.append(frame)
cap.release()
boxes_lst = pipeline_detect(model, memory[::search_frames])
for start, end in true_seq([len(boxes)>0 for boxes in boxes_lst]):
detections = set.union(*[boxes_names(boxes_lst[i]) for i in range(start, end)])
all_detections |= detections
# Write clip with detection(s)
clip_name = f"{clip_prefix}-{len(clips_list)+1}-" + '-'.join(sorted(detections)) \
+ '.' + config['clip_format']
print(f"writing {clip_name}")
out = cv2.VideoWriter(clip_name, cv2.VideoWriter_fourcc(*config['clip_codec']),
video_fps, clip_dims)
clip_start = max(0, start * search_frames - int(config['before_secs'] * video_fps))
clip_end = min(total_frames, (end-1) * search_frames + int(config['after_secs'] * video_fps))
for i in range(clip_start, clip_end):
out.write(cv2.resize(memory[i], clip_dims))
out.release()
# Write "representative" image for the clip, for quick viewing
det_areas = [boxes_area(boxes_lst[i]) for i in range(start, end)]
top_detection = start + det_areas.index(max(det_areas))
image_name = f"{clip_prefix}-{len(clips_list)+1}.jpg"
save_jpg((memory[top_detection * search_frames], boxes_lst[top_detection]),
image_name, config['image_width'])
clips_list.append((clip_name, image_name))
end_time = time.time()
print(f"elapsed: {end_time - start_time:.1f}s -- {total_frames/(end_time - start_time):.1f} FPS")
return (all_detections, clips_list)
def sendEmail(detections, clips):
import smtplib
import email
import email.mime.base
import email.mime.text
import email.mime.multipart
import json
mime_type = {'.mp4': ('video', 'mp4'),
'.avi': ('video', 'x-msvideo'), # or x-motion-jpeg
'.mkv': ('video', 'x-matroska'),
'.jpg': ('image', 'jpeg'),
'.png': ('image', 'png')
}
credentials = json.loads(config['smtp_credentials'].read_text())
msg_from = credentials['user'] # assumes 'user' is a valid email address!
msg = email.mime.multipart.MIMEMultipart()
msg['To'] = credentials['email_to']
msg['From'] = msg_from
msg['Subject'] = config['email_subject']
html = ['<!doctype html><html><body>']
html.append(', '.join(sorted(detections)) + ' detected')
html.append('<br/>')
for video, image in clips:
video_id = video + '.' + msg_from
image_id = image + '.' + msg_from
video_type = '/'.join(mime_type[Path(video).suffix])
html.append(f'<video controls poster="cid:{image_id}">')
html.append(f'<source src="cid:{video_id}" type="{video_type}">')
html.append(f'<img src="cid:{image_id}"/>')
html.append('</video>')
html.append('</body></html>')
msg.attach(email.mime.text.MIMEText(''.join(html), 'html'))
for att in [Path(file) for iter_ in clips for file in iter_]:
part = email.mime.base.MIMEBase(*mime_type[att.suffix])
part.add_header('Content-ID', f'<{att}.{msg_from}>')
part.add_header('Content-Disposition', 'inline', filename=att.name)
part.set_payload(att.read_bytes())
email.encoders.encode_base64(part)
msg.attach(part)
server = smtplib.SMTP_SSL(credentials['server'])
try:
# server.set_debuglevel(1)
server.login(credentials['user'], credentials['password'])
server.sendmail(msg_from, msg['To'], msg.as_string())
except Exception as e:
print(e)
finally:
server.quit()
class FileLister:
def __init__(self, camera_path):
self._db = None
self._path = camera_path
def files_since(self, timestamp):
if args.nodb:
files = list(self._path.rglob('*.mkv'))[:-1] # the last one may not be ready yet
files = [f for f in files if f.name[:len(last_timestamp)] > last_timestamp]
if len(files) > 1:
print(len(files), "from glob")
return files
import sqlite3
if not self._db:
self._db = sqlite3.connect('file:' + str(self._path / 'index.db') + '?mode=ro', uri=True)
try:
cursor = self._db.cursor()
cursor.execute(
'select r.path, r.filename, b.path, b.filename from blocks as b' +
' join recordings as r on recording_id=r.id' +
f' where substr(b.filename,1,{len(last_timestamp)}) > "{last_timestamp}"' +
' and b.stoptime not null')
files = cursor.fetchall()
files = [self._path / ('/'.join(row) + '.mkv') for row in files]
if len(files) > 1: print(len(files), "from db")
return files
except sqlite3.Error as e:
print("sqlite3 error:", e)
self._db.close(); self._db = None
return []
def wait_for_more(self):
if args.nodb:
time.sleep(60)
else:
time.sleep(3) # if I only had an update hook...
model = make_model()
if len(args.input_files) > 0:
# process individual files, mostly for manual testing
for f in args.input_files:
detections, clips = processFile(model, Path(f))
if args.send and len(detections) > 0:
sendEmail(detections, clips)
else:
# keep processing files as they become available
fl = FileLister(config['camera_path'])
checkpoint = Path('./checkpoint')
while True:
last_timestamp = checkpoint.read_text() if checkpoint.exists() else '20200101_000000'
for f in fl.files_since(last_timestamp):
detections, clips = processFile(model, f)
if len(detections) > 0:
sendEmail(detections, clips)
checkpoint.write_text(f.name[:len(last_timestamp)])
fl.wait_for_more()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment