Skip to content

Instantly share code, notes, and snippets.

@joshsucher
Created November 4, 2023 01:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joshsucher/192f18a2201a8144e8614331c9d2050e to your computer and use it in GitHub Desktop.
Save joshsucher/192f18a2201a8144e8614331c9d2050e to your computer and use it in GitHub Desktop.
import numpy as np
import supervision as sv
from supervision.draw.utils import draw_text
from supervision.draw.color import Color
import torch
#import cv2
from ultralytics import YOLO
import os
os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0"
model = YOLO("yolov8n.pt")
model = model.to("mps")
tracker = sv.ByteTrack(track_buffer=100)
box_annotator = sv.BoundingBoxAnnotator()
label_annotator = sv.LabelAnnotator()
trace_annotator = sv.TraceAnnotator()
#linezone_annotator = sv.LineZoneAnnotator()
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names
# class_ids of interest - person [0], bicycle [1], car, motorcycle, bus and truck
car_classes = [2, 3, 5, 7]
# # settings - eufy
# LINE_START_WEST = sv.Point(1266, 864)
# LINE_END_WEST = sv.Point(1167, 1167)
#
# LINE_START_SOUTH = sv.Point(700, 790)
# LINE_END_SOUTH = sv.Point(707, 990)
# scaling_factor = 4/9
# def scale_point(point):
# return sv.Point(int(point.x * scaling_factor), int(point.y * scaling_factor))
LINE_START_WEST = sv.Point(825, 518)
LINE_END_WEST = sv.Point(9, 538)
LINE_START_SOUTH = sv.Point(143, 398)
LINE_END_SOUTH = sv.Point(311, 818)
VIDEO = "monday-morning-trim.mov"
colors = sv.ColorPalette.default()
video_info = sv.VideoInfo.from_video_path(VIDEO)
# mon eve
# polygons = [
# np.array([
# [1152, 1001],[40, 1021],[24, 205],[1892, 409],[1892, 409],[1320, 1037],[1148, 997]
# ]), #full map
# np.array([
# [392, 626],[392, 462],[572, 554],[516, 670],[388, 622]
# ]), #westbound
# np.array([
# [26, 593],[294, 661],[298, 833],[30, 857],[22, 585]
# ]), #southbound
# np.array([
# [815, 629],[815, 509],[987, 509],[999, 649],[815, 629]
# ]), #turns_onto_westminster
# np.array([
# [827, 690],[1347, 698],[1511, 810],[1351, 954],[815, 750],[827, 690]
# ]) #turns onto dorchester
# ]
polygons = [
np.array([
[284, 920],[16, 916],[16, 520],[584, 652],[696, 652],[740, 560],[1348, 480],[1356, 684],[1200, 732],[1210, 860],[1452, 908],[1432, 992],[1116, 1044],[1092, 940],[1000, 892],[936, 828],[804, 824],[760, 836],[628, 724],[412, 672],[312, 684],[288, 916]
]), #full map
np.array([
[260, 684],[260, 456],[312, 516],[308, 604],[372, 624],[372, 676],[312, 684],[256, 684]
]), #westbound
np.array([
[34, 889],[18, 613],[158, 713],[170, 881],[34, 885]
]), #southbound
np.array([
[681, 511],[689, 695],[901, 719],[893, 507],[677, 507]
]), #turns_onto_westminster
np.array([
[705, 754],[1101, 754],[1101, 906],[709, 838],[701, 754]
]) #turns onto dorchester
]
# To store timestamps when line_counter increments
previous_in_count_west = 0
previous_in_count_south = 0
previous_in_count_south = 0
previous_in_count_lt = 0
previous_in_count_rt = 0
timestamps_west = []
timestamps_south = []
timestamps_lt = []
timestamps_rt = []
zone_sum_west = 0
unique_tracker_ids_west = set()
unique_tracker_ids_south = set()
unique_tracker_ids_lt = set()
unique_tracker_ids_rt = set()
# line_counter_west = sv.LineZone(start=LINE_START_WEST, end=LINE_END_WEST)
# line_counter_south = sv.LineZone(start=LINE_START_SOUTH, end=LINE_END_SOUTH)
zones = [
sv.PolygonZone(
polygon=polygon,
frame_resolution_wh=video_info.resolution_wh
)
for polygon
in polygons
]
zone_annotator_wb = sv.PolygonZoneAnnotator(zone=zones[1], color=sv.Color.white(), thickness=1, text_thickness=1, text_scale=0.5)
zone_annotator_sb = sv.PolygonZoneAnnotator(zone=zones[2], color=sv.Color.white(), thickness=1, text_thickness=1, text_scale=0.5)
zone_annotator_lt = sv.PolygonZoneAnnotator(zone=zones[3], color=sv.Color.white(), thickness=1, text_thickness=1, text_scale=0.5)
zone_annotator_rt = sv.PolygonZoneAnnotator(zone=zones[4], color=sv.Color.white(), thickness=1, text_thickness=1, text_scale=0.5)
#line_annotator = sv.LineZoneAnnotator(thickness=1, text_thickness=1, text_scale=1)
def callback(frame: np.ndarray, frame_num: int) -> np.ndarray:
results = model(frame, device="mps")[0]
detections = sv.Detections.from_ultralytics(results)
detections = detections[detections.confidence > 0.6]
detections = tracker.update_with_detections(detections)
global colors, previous_in_count_west, previous_in_count_south, previous_in_count_lt, previous_in_count_rt, timestamps_west, timestamps_south, timestamps_lt, timestamps_rt, zone_sum_west, unique_tracker_ids_west, unique_tracker_ids_south, unique_tracker_ids_lt, unique_tracker_ids_rt
mask = zones[0].trigger(detections=detections)
detections = detections[mask]
#annotated_frame = box_annotator.annotate(scene=frame, detections=detections, skip_label=True)
#annotated_frame = zone_annotator.annotate(scene=annotated_frame)
labels = [
f"#{tracker_id} {results.names[class_id]}"
for class_id, tracker_id
in zip(detections.class_id, detections.tracker_id)
]
annotated_frame = box_annotator.annotate(
frame.copy(), detections=detections)
annotated_frame = label_annotator.annotate(
annotated_frame, detections=detections, labels=labels)
#line_counter_west.trigger(detections=detections[np.isin(detections.class_id, car_classes)])
zones[1].trigger(detections=detections[np.isin(detections.class_id, car_classes)])
zones[2].trigger(detections=detections[np.isin(detections.class_id, car_classes)])
zones[3].trigger(detections=detections[np.isin(detections.class_id, car_classes)])
zones[4].trigger(detections=detections[np.isin(detections.class_id, car_classes)])
westbound_mask = zones[1].trigger(detections=detections)
westbound_detections = detections[westbound_mask]
southbound_mask = zones[2].trigger(detections=detections)
southbound_detections = detections[southbound_mask]
left_turn_mask = zones[3].trigger(detections=detections)
left_turn_detections = detections[left_turn_mask]
right_turn_mask = zones[4].trigger(detections=detections)
right_turn_detections = detections[right_turn_mask]
for class_id, tracker_id in zip(westbound_detections.class_id, westbound_detections.tracker_id):
if class_id in car_classes and tracker_id not in unique_tracker_ids_south: unique_tracker_ids_west.add(tracker_id)
for class_id, tracker_id in zip(left_turn_detections.class_id, left_turn_detections.tracker_id):
if class_id in car_classes and tracker_id in unique_tracker_ids_west and tracker_id not in unique_tracker_ids_rt: unique_tracker_ids_lt.add(tracker_id)
for class_id, tracker_id in zip(right_turn_detections.class_id, right_turn_detections.tracker_id):
if class_id in car_classes and tracker_id in unique_tracker_ids_south and tracker_id not in unique_tracker_ids_lt: unique_tracker_ids_rt.add(tracker_id)
for class_id, tracker_id in zip(southbound_detections.class_id, southbound_detections.tracker_id):
if class_id in car_classes and tracker_id not in (unique_tracker_ids_rt,unique_tracker_ids_west): unique_tracker_ids_south.add(tracker_id)
zone_sum_west = len(unique_tracker_ids_west)
zone_sum_south = len(unique_tracker_ids_south)
zone_sum_lt = len(unique_tracker_ids_lt)
zone_sum_rt = len(unique_tracker_ids_rt)
# Check for westbound cars
if len(unique_tracker_ids_west) > previous_in_count_west:
print(f"Westbound car detected at frame {frame_num}")
timestamps_west.append(frame_num / 30) # eufy 15fps
previous_in_count_west = len(unique_tracker_ids_west)
# Check for southbound cars
if len(unique_tracker_ids_south) > previous_in_count_south:
print(f"Southbound car detected at frame {frame_num}")
timestamps_south.append(frame_num / 30) # eufy 15fps
previous_in_count_south = len(unique_tracker_ids_south)
# Check for cars turning left
if len(unique_tracker_ids_lt) > previous_in_count_lt:
print(f"Car turning left detected at frame {frame_num}")
timestamps_lt.append(frame_num / 30) # eufy 15fps
previous_in_count_lt = len(unique_tracker_ids_lt)
# Check for cars turning right
if len(unique_tracker_ids_rt) > previous_in_count_rt:
print(f"Car turning right detected at frame {frame_num}")
timestamps_rt.append(frame_num / 30) # eufy 15fps
previous_in_count_rt = len(unique_tracker_ids_rt)
straight_major = max(0, zone_sum_west - zone_sum_lt)
straight_minor = max(0, zone_sum_south - zone_sum_rt)
annotated_frame = zone_annotator_wb.annotate(scene=annotated_frame, label=f"Major volume: {str(zone_sum_west)}")
annotated_frame = zone_annotator_sb.annotate(scene=annotated_frame, label=f"Minor volume: {str(zone_sum_south)}")
annotated_frame = zone_annotator_lt.annotate(scene=annotated_frame, label=f"Minor volume straight: {str(straight_minor)}, major volume left turn: {str(zone_sum_lt)}")
annotated_frame = zone_annotator_rt.annotate(scene=annotated_frame, label=f"Major volume straight: {str(straight_major)}, minor volume right turn: {str(zone_sum_rt)}")
gaps_west = [timestamps_west[i+1] - timestamps_west[i] for i in range(len(timestamps_west)-1)]
gaps_over_14_seconds_west = sum(1 for gap in gaps_west if gap > 14)
average_gap_west = sum(gaps_west) / len(gaps_west) if gaps_west else 0
text_anchor = sv.Point(199, 962)
annotated_frame = draw_text(scene=annotated_frame, text=f"Critical gaps (14s+), wb: {gaps_over_14_seconds_west}, avg gap: {int(average_gap_west)}s", text_anchor=text_anchor, background_color=Color(r=255, g=255, b=255))
#cv2.imshow('YOLO V8 Detection', annotated_frame)
return trace_annotator.annotate(
annotated_frame, detections=detections)
sv.process_video(
source_path=VIDEO,
target_path="result.mp4",
callback=callback
)
# Calculate gaps for west
gaps_west = [timestamps_west[i+1] - timestamps_west[i] for i in range(len(timestamps_west)-1)]
# Calculate gaps for south
gaps_south = [timestamps_south[i+1] - timestamps_south[i] for i in range(len(timestamps_south)-1)]
# Count the number of gaps greater than 15 seconds
gaps_over_14_seconds_west = sum(1 for gap in gaps_west if gap > 14)
gaps_over_14_seconds_south = sum(1 for gap in gaps_south if gap > 14)
print(f"Gaps over 14 seconds, westbound: {gaps_over_14_seconds_west}")
for i in range(len(timestamps_west)-1):
gap = timestamps_west[i+1] - timestamps_west[i]
print(f"Starting timestamp: {round(timestamps_west[i],2)}s, Gap: {int(gap)}s")
print(f"Gaps over 14 seconds, southbound: {gaps_over_14_seconds_south}")
for i in range(len(timestamps_south)-1):
gap = timestamps_south[i+1] - timestamps_south[i]
print(f"Starting timestamp: {round(timestamps_south[i],2)}s, Gap: {int(gap)}s")
#print(f"Westbound cars: {zone_sum_west}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment