Skip to content

Instantly share code, notes, and snippets.

@jack-roboflow
Last active July 31, 2023 23:43
Show Gist options
  • Save jack-roboflow/1521ff49aa74e0eff8922f3646f57d02 to your computer and use it in GitHub Desktop.
Save jack-roboflow/1521ff49aa74e0eff8922f3646f57d02 to your computer and use it in GitHub Desktop.
Preventing Treadmill Injuries by Reducing Human Error
STEP 1
import os
input_file = '/Users/jackgallo/Desktop/content/videos_to_infer/new_stop.mov'
output_file = 'output%04d.png'
output_dir = '/Users/jackgallo/Desktop/content/inferred_videos/'
frame_rate = 24
os.system(f'ffmpeg -i {input_file} -vf "fps={frame_rate}" {output_dir}{output_file}')
from roboflow import Roboflow
import json
from time import sleep
from PIL import Image, ImageDraw
import io
import base64
import requests
from os.path import exists
import os, sys, re, glob
rf = Roboflow(api_key="YOUR API KEY")
project = rf.workspace().project("roboflow-project-treadmills")
dataset = project.version(3)
model = dataset.model
STEP 2
def draw_boxes(box, x0, y0, img, class_name):
color_map = {
"Right Shoe":"red",
"Left Shoe":"blue",
"Knee":"yellow"
}
# get position coordinates
bbox = ImageDraw.Draw(img)
bbox.rectangle(box, outline =color_map[class_name], width=5)
bbox.text((x0, y0), class_name, fill='black', anchor='mm')
return img
def save_with_bbox_renders(img):
file_name = os.path.basename(img.filename)
img.save('/Users/jackgallo/Desktop/content/inferred_videos/' + file_name)
STEP 3
# glob config values
file_path = "/Users/jackgallo/Desktop/content/inferred_videos/"
extention = ".png"
# glob files based on location and file format
globbed_files = sorted(glob.glob(file_path + '*' + extention))
print(globbed_files)
images_with_detections = []
frame_counter = 0
pred_counter = 0
counter_difference = 8 #8 frames at 24 frames per second is a quarter of a second
consecutive_no_pred = 0
previous_frame_prediction = False
for image in globbed_files:
# INFERENCE
predictions = model.predict(image).json()['predictions']
newly_rendered_image = Image.open(image)
frame_counter += 1
# If there are detections, process the image and add it to the list
if predictions:
pred_counter += 1
consecutive_no_pred = 0
previous_frame_prediction = True
print(predictions)
for prediction in predictions:
x0 = prediction['x'] - prediction['width'] / 2
x1 = prediction['x'] + prediction['width'] / 2
y0 = prediction['y'] - prediction['height'] / 2
y1 = prediction['y'] + prediction['height'] / 2
box = (x0, y0, x1, y1)
newly_rendered_image = draw_boxes(box, x0, y0, newly_rendered_image, prediction['class'])
# Save the processed image and add it to the list of images to be used in the video
new_file_name = os.path.join(output_dir, os.path.basename(image))
newly_rendered_image.save(new_file_name)
images_with_detections.append(new_file_name)
else:
# If there was no prediction in the previous frame
if not previous_frame_prediction:
# Increment the counter for no prediction
consecutive_no_pred += 1
# If there was a prediction in the previous frame and the counter for no prediction is less than the threshold
elif previous_frame_prediction and consecutive_no_pred < counter_difference:
# Reset the counter for no prediction
consecutive_no_pred = 1
# Add the image without predictions into the list
images_with_detections.append(image)
previous_frame_prediction = False
if consecutive_no_pred == counter_difference:
print("STOP THE TREADMILL IMMEDIATELY")
print(images_with_detections
#Twilio text message
account_sid = 'ACc0367901a926947f1a5de9b30e171164'
auth_token = 'YOUR AUTH TOKEN'
client = Client(account_sid, auth_token)
message = client.messages.create(
from_='+18555650648',
body='STOP THE TREADMILL',
to='+14153219284'
)
#Write to a CSV file
import csv
from datetime import datetime
data = [
["STOP THE TREADMILL", datetime.now()]
]
with open("/Users/jackgallo/Downloads/stop_the_treadmill.csv", "w", newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(data)
break
STEP 4
import shutil
detections = len(images_with_detections)
new_frames = detections + counter_difference
import cv2
import os
import numpy as np
# specify video codec
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# video writer object
output_file = 'final_output.mp4'
frame_size = (1080, 1920) # frame size. You might need to adjust it based on your images size
out = cv2.VideoWriter(output_file, fourcc, frame_rate, frame_size)
png_files = [f for f in os.listdir(file_path) if f.endswith('.png') and f != 'last.png']
sorted_files = sorted(png_files)
for image_file in sorted_files[:new_frames]:
# read the image
img = cv2.imread(os.path.join(file_path, image_file))
# make sure the image is not None
if img is not None:
# resize image if necessary
img = cv2.resize(img, frame_size)
# write the image to video
out.write(img)
last_frame = cv2.imread('/Users/jackgallo/Desktop/content/inferred_videos/last.png')
last_frame = cv2.resize(last_frame, frame_size)
out.write(last_frame)
# release the video writer
out.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment