Skip to content

Instantly share code, notes, and snippets.

@arnonzamir
Created April 25, 2023 19:06
Show Gist options
  • Save arnonzamir/fd175fb06f9956ddec145db3f5e2eb06 to your computer and use it in GitHub Desktop.
Save arnonzamir/fd175fb06f9956ddec145db3f5e2eb06 to your computer and use it in GitHub Desktop.
import argparse
import time
from pathlib import Path
import cv2
import torch
import torch.backends.cudnn as cudnn
from numpy import random
import numpy as np
import json
import threading
import datetime
from models.experimental import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.general import check_img_size, check_requirements, \
check_imshow, non_max_suppression, apply_classifier, \
scale_coords, xyxy2xywh, strip_optimizer, set_logging, \
increment_path
from utils.plots import plot_one_box
from utils.torch_utils import select_device, load_classifier, time_synchronized, TracedModel
from torchvision.models import resnet101, ResNet101_Weights
import math
from math import atan2, degrees, hypot
# For SORT tracking
import skimage
from sort import *
from PyQt5.QtWidgets import QApplication, QVBoxLayout, QWidget, QListWidget, QListWidgetItem, QTableWidget, QTableWidgetItem, QLabel, QPushButton
from PyQt5.QtCore import QThread, pyqtSignal, Qt
from PyQt5.QtGui import QColor
from PyQt5.QtGui import QPixmap, QIcon
import sys
# ............................... Tracker Functions ............................
""" Random created palette"""
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)
area1_pointA = (100, 200)
area1_pointB = (500, 200)
area1_pointC = (30, 360)
area1_pointD = (500, 360)
# vehicles total counting variables
array_ids = []
counting = 0
modulo_counting = 0
"""" Calculates the relative bounding box from absolute pixel values. """
def bbox_rel(*xyxy):
bbox_left = min([xyxy[0].item(), xyxy[2].item()])
bbox_top = min([xyxy[1].item(), xyxy[3].item()])
bbox_w = abs(xyxy[0].item() - xyxy[2].item())
bbox_h = abs(xyxy[1].item() - xyxy[3].item())
x_c = (bbox_left + bbox_w / 2)
y_c = (bbox_top + bbox_h / 2)
w = bbox_w
h = bbox_h
return x_c, y_c, w, h
"""Simple function that adds fixed color depending on the class"""
def compute_color_for_labels(label):
# color = ([int((p * (label ** 2 - label + 1)) % 255) for p in palette])
return (30, 250-label*20, 100-label*30)
return tuple(color)
"""Function to Draw Bounding boxes"""
def loadZones():
# load the zones
with open('zones.json', 'r') as handle:
# json deserialize the zones
s = handle.read()
if s == "":
return []
zones = json.loads(s)
# loop through the zones and convert the points to numpy arrays
for zone in zones:
zone["points"] = np.array(zone["points"])
return zones
def drawZones(frame, zones):
# draw the zones on the frame
for zone in zones:
p1 = zone["points"][0]
p2 = zone["points"][2]
cv2.rectangle(frame, (p1[0], p1[1]), (p2[0], p2[1]), (0, 255, 0), 2)
# place the zone name in the top left corner of the zone
# calculate the text size
(text_width, text_height) = cv2.getTextSize(
zone["name"], cv2.FONT_HERSHEY_SIMPLEX, 2, 2)[0]
# get the coordinates based on the text size
text_offset_x = p1[0] + 2
text_offset_y = p1[1] + text_height + 2
# make the coords of the box with a small padding of two pixels
cv2.putText(frame, zone["name"], (text_offset_x, text_offset_y),
cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 3)
def get_angle(x1, y1, x2, y2, img): # These can also be four parameters instead of two arrays
point_1 = [int(x1), int(y1)]
point_2 = [int(x2), int(y2)]
# draw the triangle
# cv2.line(img,point_1, point_2, (0, 255, 0), 2)
# Calculate the angle
angle = atan2(x2-x1, y2-y1)
# draw the angle
cv2.putText(img, str(int(degrees(angle))),
(point_1[0], point_1[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
# wait for key press
# Optional
angle = int(degrees(angle))
distance = int(
math.sqrt((point_1[0] - point_2[0])**2 + (point_1[1] - point_2[1])**2))
# OR
# angle = radians(angle)
return angle
def isInTrackingZone(x1, y1, x2, y2, imc, detclass, ts):
return True
if (detclass in [1, 3, 17]): # bike / bicycle / horse
return True
is_night = (ts.hour >= 19 or ts.hour <= 6)
# larger window for tracks
minheight = 0.15 if detclass == 7 else 0.45 if not is_night else 0.2
height, width, channels = imc.shape
imgbox = [int(width*0.01), int(height*minheight),
int(width*0.99), int(height * 1)]
cv2.rectangle(imc, (imgbox[0], imgbox[1]),
(imgbox[2], imgbox[3]), (200, 200, 200), 1)
if (x1 < imgbox[0] or x2 > imgbox[2] or y1 < imgbox[1] or y2 > imgbox[3]):
return False
# return True
# CX = (x2+x1)//2
# CY = (y2+y2)//2
# return (CY > imgbox[1] and CY < imgbox[3])
w = x2 - x1
h = y2 - y1
# check size of bounding box greater than 20% of image size
return ((h >= (height * 0.07)) or (w >= (width * 0.07)))
def detect(track_info_signal=None, save_img=False):
source, weights, view_img, save_txt, imgsz, trace = opt.source, opt.weights, opt.view_img, opt.save_txt, opt.img_size, not opt.no_trace
vid_stride = opt.vid_stride
save_img = not opt.nosave and not source.endswith(
'.txt') # save inference images
webcam = source.isnumeric() or source.endswith('.txt') or source.lower().startswith(
('rtsp://', 'rtmp://', 'http://', 'https://'))
# .... Initialize SORT ....
# .........................
sort_max_age = 5
sort_min_hits = 10 # 25 * vid_stride
sort_iou_thresh = 0.2
sort_tracker = Sort(max_age=sort_max_age,
min_hits=sort_min_hits,
iou_threshold=sort_iou_thresh)
# .........................
# Directories
save_dir = Path(increment_path(Path(opt.project) / opt.name,
exist_ok=opt.exist_ok)) # increment run
(save_dir / 'labels' if save_txt else save_dir).mkdir(parents=True,
exist_ok=True) # make dir
(save_dir / 'csv' if save_txt else save_dir).mkdir(parents=True,
exist_ok=True) # make dir
(save_dir / 'crops' if save_txt else save_dir).mkdir(parents=True,
exist_ok=True) # make dir
# Initialize
set_logging()
device = select_device(opt.device)
half = device.type != 'cpu' # half precision only supported on CUDA
# half = False
# Load model
model = attempt_load(weights, map_location=device) # load FP32 model
stride = int(model.stride.max()) # model stride
imgsz = check_img_size(imgsz, s=stride) # check img_size
# counting and categorizing
last_active_tracks = 0
if trace:
model = TracedModel(model, device, opt.img_size)
if half:
model.half() # to FP16
# Second-stage classifier
classify = False
if classify:
modelc = resnet101(weights=ResNet101_Weights.DEFAULT)
modelc.load_state_dict(torch.load('resnet101.pt'))
# modelc = load_classifier(name='resnet101', n=2) # initialize
# modelc.load_state_dict(torch.load('weights/resnet101.pt', map_location=device)['model']).to(device).eval()
# Set Dataloader
vid_path, vid_writer = None, None
if webcam:
view_img = check_imshow()
cudnn.benchmark = True # set True to speed up constant image size inference
dataset = LoadStreams(source, img_size=imgsz, stride=stride)
else:
dataset = LoadImages(source, img_size=imgsz,
stride=stride, vid_stride=vid_stride)
# Get names and colors
names = model.module.names if hasattr(model, 'module') else model.names
colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(100)]
# Run inference
if device.type != 'cpu':
model(torch.zeros(1, 3, imgsz, imgsz).to(device).type_as(
next(model.parameters()))) # run once
old_img_w = old_img_h = imgsz
old_img_b = 1
count_vehicle = 0
zones = loadZones()
t0 = time.time()
for path, img, im0s, vid_cap in dataset:
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Warmup
if device.type != 'cpu' and (old_img_b != img.shape[0] or old_img_h != img.shape[2] or old_img_w != img.shape[3]):
old_img_b = img.shape[0]
old_img_h = img.shape[2]
old_img_w = img.shape[3]
for i in range(3):
model(img, augment=opt.augment)[0]
# Inference
t1 = time_synchronized()
pred = model(img, augment=opt.augment)[0]
t2 = time_synchronized()
drawZones(im0s, zones)
# Apply NMS
pred = non_max_suppression(
pred, opt.conf_thres, opt.iou_thres, classes=opt.classes, agnostic=opt.agnostic_nms)
t3 = time_synchronized()
# Apply Classifier
if classify:
pred = apply_classifier(pred, modelc, img, im0s)
# python detect_count_and_track.py --weights yolov7.pt --save-txt --classes 1 2 3 5 7 17 --view-img --img-size 640 --conf 0.5 --vid-stride 8 --source "C:\Users\Admin\Videos\Traffic Counter\Tamra\Camera2\2022\10\22\"
def isPointInRect(x, y, points):
p1 = points[0]
p2 = points[2]
cv2.rectangle(im0s, (p1[0], p1[1]),
(p2[0], p2[1]), (255, 255, 0), 2)
x1 = min(p1[0], p2[0])
y1 = min(p1[1], p2[1])
x2 = max(p1[0], p2[0])
y2 = max(p1[1], p2[1])
return x >= x1 and x <= x2 and y >= y1 and y <= y2
def pop_callback(data, names): # called when a track is deleted
object_data = data[0]
trk = data[1]
ts = getattr(dataset, 'current_timestamp')
x1 = int(trk.centroidarr[0][0])
y1 = int(trk.centroidarr[0][1])
x2 = int(trk.centroidarr[-1][0])
y2 = int(trk.centroidarr[-1][1])
for zone in zones:
if isPointInRect(x2, y2, zone['points']):
trk.to_zone = zone['name']
break
# wait for key press
angle = round(get_angle(x1, y1, x2, y2, img=im0), 2)
frames = trk.lastFrame - trk.firstFrame
dist = round(hypot(x2 - x1, y2 - y1))
direction = "down" if (
angle > -180 and angle < -130) else "up" if (angle > -10 and angle < 40) else "side"
# if (is_night(ts.hour)) else trk.cropped_img.shape[0]/2
minDist = 100
isLine = (angle != 0) and (frames > 5) and (dist >= minDist)
f = ''
# print(f'{ts} angle:{angle} frames:{frames} {dist} {isLine}') ##printing the angle and distance
trk.poped = True
trk.popedAt = time.time()
trk.classid = int(object_data[0, 4])
trk.classname = names[trk.classid]
update_one_track(trk)
if (isLine):
if (trk.cropped_img is not None and (not trk.ignore)):
s = (
f'{str(ts).replace(" ",",")},{int(object_data[0,8])},{names[int(object_data[0,4])]},{dist},{direction},{angle},{f}, ({x1},{y1}),({x2},{y2}), {trk.from_zone}, {trk.to_zone}') # printing log string
with open(f'{csv_path}', 'a') as f:
f.write(s + '\n')
# print(from_zone + '->' + to_zone + ',' + str(angle))
# cv2.waitKey(0)
# print(s)
else:
if (trk.cropped_img is not None):
del trk.cropped_img
# Process detections
for i, det in enumerate(pred): # detections per image
p, s, im0, frame = path, '', im0s, getattr(dataset, 'frame', 0)
# להמשך: סריקה מהירה של הוידאו בקפיצות של שניה, לזהות אובייקטים ולחזור לסרוק רק את האזורים האלה
if (i > 0):
continue
p = Path(p) # to Path
save_path = str(save_dir / p.name) # img.jpg
txt_path = str(save_dir / 'labels' / p.stem) + \
('' if dataset.mode == 'image' else f'_{frame}') # img.txt
csv_path = str(save_dir / 'csv' / p.parent.stem) + ".csv"
# normalization gain whwh
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]
if len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(
img.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
# add to string
s += f"{n} {names[int(c)]}{'s' * (n > 1)}, "
# print(det)
# ..................USE TRACK FUNCTION....................
# pass an empty array to sort
dets_to_sort = np.empty((0, 6))
count = 0
# NOTE: We send in detected object class too
for x1, y1, x2, y2, conf, detclass in det.cpu().detach().numpy():
count += 1
# print (isInTrackingZone(x1,y1,x2,y2,im0))
color = (0, 200, 0)
if (isInTrackingZone(x1, y1, x2, y2, im0, detclass, dataset.current_timestamp)):
dets_to_sort = np.vstack(
(dets_to_sort, np.array([x1, y1, x2, y2, conf, detclass])))
else:
color = (0, 0, 200)
cv2.rectangle(im0, (int(x1), int(y1)),
(int(x2), int(y2)), color, 1)
# cv2.putText(img, str(det[:,8]), (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, [255, 255, 255], 1)
# Run SORT
tracked_dets = sort_tracker.update(
dets_to_sort, names=names, func=pop_callback, vid_frame=getattr(dataset, 'frame', 0), img=im0, zones=zones)
tracks = sort_tracker.getTrackers()
activetracks = [t for t in tracks if not t.ignore]
dataset.hasDetection = len(activetracks) > 0
# print('Tracked Detections : ' + str(len(activetracks)))
# Replace this with the actual data from your program
# loop over tracks
j = 0
for track in tracks:
j += 1
# compute_color_for_labels(track.id)
if (len(track.centroidarr) == 1): # first centroid
cv2.circle(im0, (int(
track.centroidarr[-1][0]), int(track.centroidarr[-1][1])), 4, (0, 0, 255), 4)
cv2.imshow("main", im0)
for zone in zones:
if (isPointInRect(track.centroidarr[-1][0], track.centroidarr[-1][1], zone["points"])):
track.from_zone = zone["name"]
break
color = colors[(track.id+100) % 100]
if (len(track.centroidarr) > 1):
dist = round(hypot(int(track.centroidarr[0][0] - track.centroidarr[-1][0]), int(
track.centroidarr[0][1] - track.centroidarr[-1][1])))
cv2.putText(im0, str(int(track.id)), (int(track.centroidarr[-1][0]),
int(track.centroidarr[-1][1])), cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.5 if track.poped else 1,
color=[255, 70, 70], thickness=1 if track.poped else 2)
else:
dets_to_sort = np.empty((0, 6))
tracked_dets = sort_tracker.update(dets_to_sort, names=names, func=pop_callback, vid_frame=getattr(
dataset, 'frame', 0), img=im0, nodet=True)
# if (last_active_tracks > 0):
# print (last_active_tracks)
last_active_tracks = 0
dataset.hasDetection = False
for t in tracks:
t.classname = names[int(t.detclass)]
if (t.cropped_img is not None):
f = f'{save_dir}/crops/{t.id}.jpg';
#print(f)
t.img_path = f
if (not os.path.exists(f)):
cv2.imwrite(f, t.cropped_img)
track_info = tracks
if (track_info_signal is not None):
track_info_signal.emit(track_info)
# Stream results
if vid_path != save_path: # new video
cv2.destroyAllWindows()
if view_img:
cv2.imshow("main", im0)
cv2.waitKey(1) # 1 millisecond
# Save results (image with detections)
if save_img:
if dataset.mode == 'image':
cv2.imwrite(save_path, im0)
print(
f" The image with the result is saved in: {save_path}")
else: # 'video' or 'stream'
if vid_path != save_path: # new video
vid_path = save_path
# if isinstance(vid_writer, cv2.VideoWriter):
# vid_writer.release() # release previous video writer
# if vid_cap: # video
# fps = vid_cap.get(cv2.CAP_PROP_FPS)
# w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# else: # stream
# fps, w, h = 30, im0.shape[1], im0.shape[0]
# save_path += '.mp4'
# vid_writer = cv2.VideoWriter(
# save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
# #vid_writer.write(im0)
if save_txt or save_img:
s = f"\n{len(list(save_dir.glob('labels/*.txt')))} labels saved to {save_dir / 'labels'}" if save_txt else ''
# print(f"Results saved to {save_dir}{s}")
print(f'Done. ({time.time() - t0:.3f}s)')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--weights', nargs='+', type=str,
default='yolov7.pt', help='model.pt path(s)')
# file/folder, 0 for webcam
parser.add_argument('--source', type=str,
default='inference/images', help='source')
parser.add_argument('--img-size', type=int, default=640,
help='inference size (pixels)')
parser.add_argument('--conf-thres', type=float,
default=0.65, help='object confidence threshold')
parser.add_argument('--iou-thres', type=float,
default=0.45, help='IOU threshold for NMS')
parser.add_argument('--device', default='',
help='cuda device, i.e. 0 or 0,1,2,3 or cpu')
parser.add_argument('--view-img', action='store_true',
help='display results')
parser.add_argument('--save-txt', action='store_true',
help='save results to *.txt')
parser.add_argument('--save-conf', action='store_true',
help='save confidences in --save-txt labels')
parser.add_argument('--nosave', action='store_true',
help='do not save images/videos')
parser.add_argument('--classes', nargs='+', type=int,
help='filter by class: --class 0, or --class 0 2 3')
parser.add_argument('--agnostic-nms', action='store_true',
help='class-agnostic NMS')
parser.add_argument('--augment', action='store_true',
help='augmented inference')
parser.add_argument('--update', action='store_true',
help='update all models')
parser.add_argument('--project', default='runs/detect',
help='save results to project/name')
parser.add_argument('--name', default='object_tracking',
help='save results to project/name')
parser.add_argument('--exist-ok', action='store_true',
help='existing project/name ok, do not increment')
parser.add_argument('--no-trace', action='store_true',
help='don`t trace model')
parser.add_argument('--vid-stride', type=int, default=10,
help='video frame-rate stride')
opt = parser.parse_args()
print(opt)
# check_requirements(exclude=('pycocotools', 'thop'))
class CarTrackingThread(QThread):
track_info_signal = pyqtSignal(list)
def run(self):
with torch.no_grad():
detect(track_info_signal=self.track_info_signal)
car_tracking_thread = CarTrackingThread()
app = QApplication(sys.argv)
window = QWidget()
window.resize(800, 600)
layout = QVBoxLayout(window)
table_widget = QTableWidget()
table_widget.setColumnCount(12)
table_widget.setHorizontalHeaderLabels(
["ID", "From", "To", "Distance", "Poped", "Timeout", "S_update", "class", "conf", "img"])
# After creating the QTableWidget
image_label = QLabel()
layout.addWidget(image_label)
layout.addWidget(table_widget)
layout.addWidget(QPushButton("Start", clicked=car_tracking_thread.start))
UItracklist = []
def createWidgetItem(t):
timeout = time.time() - t.popedAt if t.poped else 0
item = [t.id, t.from_zone, t.to_zone,
int(t.distanceTravelled), t.poped, int(timeout), int(t.time_since_update), t.classname, t.confidence, t.img_path]
# item = QListWidgetItem(
# f"ID: {t.id}, from:{t.from_zone}, to:{t.to_zone}, dist:{int(t.distanceTravelled), t.poped}")
return item
def set_row_items(table_widget, row, items):
for col, item in enumerate(items):
# if (col == 9 and item != None):
# set_cell_image(table_widget, row, col, str(item))
# else:
table_widget.setItem(row, col, QTableWidgetItem(str(item)))
def set_row_background_color(table_widget, row, color):
for col in range(table_widget.columnCount()):
if table_widget.item(row, col):
table_widget.item(row, col).setBackground(color)
def set_cell_image(table_widget, row, col, image_path):
pixmap = QPixmap(image_path)
scaled_pixmap = pixmap.scaled(
50, 50, Qt.KeepAspectRatio, Qt.FastTransformation)
icon = QIcon(scaled_pixmap)
table_widget.setItem(row, col, QTableWidgetItem())
table_widget.item(row, col).setIcon(icon)
table_widget.setRowHeight(row, 50)
def refreshList():
table_widget.setRowCount(len(UItracklist))
i = 0
# sort by id
UItracklist.sort(key=lambda x: x.id, reverse=False)
for t in UItracklist:
if (t.age < 10):
continue
set_row_items(table_widget, i, createWidgetItem(t))
if (t.poped):
set_row_background_color(table_widget, i, QColor("cyan"))
i += 1
def update_one_track(t):
if t in UItracklist:
UItracklist.remove(t)
UItracklist.append(t)
refreshList
def update_list_widget(track_info_list):
# add to UItracklist and update list_widget. only add if not already in list
for t in track_info_list:
if t not in UItracklist:
UItracklist.append(t)
for t in UItracklist:
# remove if poped and 10 seconds passed, remove quickly if age < 5 (junk)
if (t.poped == True and (time.time() - t.popedAt > 10) or t.age < 5):
#if image file exists, delete it
if t.img_path is not None and os.path.exists(t.img_path):
os.remove(t.img_path)
UItracklist.remove(t)
refreshList()
# Connect the signal from the worker thread to a lambda function that updates the label
def on_item_clicked(item):
row = item.row()
pixmap = QPixmap(table_widget.item(row, 9).text())
image_label.setPixmap(pixmap)
table_widget.itemClicked.connect(on_item_clicked)
car_tracking_thread.track_info_signal.connect(update_list_widget)
window.show()
car_tracking_thread.start()
# if opt.update: # update all models (to fix SourceChangeWarning)
# for opt.weights in ['yolov7.pt']:
# detect()
# strip_optimizer(opt.weights)
# else:
# detect()
sys.exit(app.exec_())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment