Created
May 16, 2023 07:27
-
-
Save git-hamza/b509a2e9eb4289a5460d7abd8d42b4a7 to your computer and use it in GitHub Desktop.
Draw google video intelligence logo recognition annotate a local video output
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Annotate a video in Cloud Storage provides you an option to generate the output as json. | |
This script can be used you annotate a local video (https://cloud.google.com/video-intelligence/docs/logo-recognition#annotate_a_local_video). | |
This script will help to generate the bounding box per frame and also draw it on the video. | |
""" | |
import io | |
import json | |
from google.cloud import videointelligence | |
from google.protobuf.json_format import MessageToJson | |
import cv2 | |
def detect_logos(video_input, print_output=False): | |
client = videointelligence.VideoIntelligenceServiceClient() | |
with io.open(video_input, "rb") as f: | |
input_content = f.read() | |
features = [videointelligence.Feature.LOGO_RECOGNITION] | |
operation = client.annotate_video( | |
request={"features": features, "input_content": input_content} | |
) | |
print("Waiting for operation to complete...") | |
response = operation.result() | |
# Get the first response, since we sent only one video. | |
annotation_result = response.annotation_results[0] | |
if print_output: | |
for annotation in annotation_result.logo_recognition_annotations: | |
description = annotation.entity.description | |
for track in annotation.tracks: | |
confidence = track.confidence | |
print( | |
f" {description}," | |
f" confidence: {confidence:.0%}," | |
f" frames: {len(track.timestamped_objects)} ".center(80, "-") | |
) | |
for timestamped_object in track.timestamped_objects: | |
t = timestamped_object.time_offset.total_seconds() | |
box = timestamped_object.normalized_bounding_box | |
print( | |
f"{t:>7.3f}", | |
f"({box.left:.5f}, {box.top:.5f})", | |
f"({box.right:.5f}, {box.bottom:.5f})", | |
sep=" | ", | |
) | |
return annotation_result.logo_recognition_annotations | |
def frame_level_data_dump_to_json(logo_recognition_annotation_result, json_file, video_fps): | |
time_diff = 1/video_fps | |
Video_Info = {"processed_info": {}} | |
for annotation in logo_recognition_annotation_result: | |
label = annotation.entity.description | |
for logo_parts in annotation.tracks: | |
for idx in range(len(logo_parts.timestamped_objects)-1): | |
first_entry = logo_parts.timestamped_objects[idx].time_offset.total_seconds() | |
second_entry = logo_parts.timestamped_objects[idx+1].time_offset.total_seconds() | |
entries = int(round((second_entry - first_entry), 3)/time_diff) | |
left = logo_parts.timestamped_objects[idx].normalized_bounding_box.left | |
top = logo_parts.timestamped_objects[idx].normalized_bounding_box.top | |
right = logo_parts.timestamped_objects[idx].normalized_bounding_box.right | |
bottom = logo_parts.timestamped_objects[idx].normalized_bounding_box.bottom | |
for entry_pad in range(entries): | |
first_entry = (first_entry + time_diff) if entry_pad > 0 else first_entry | |
frame_n = int(first_entry * video_fps) | |
if Video_Info["processed_info"].get(f"{frame_n}", "") != "": | |
Video_Info["processed_info"][f"{frame_n}"].append((label, left, top, right, bottom)) | |
else: | |
Video_Info["processed_info"][f"{frame_n}"] = [(label, left, top, right, bottom)] | |
else: | |
first_entry = logo_parts.timestamped_objects[idx+1].time_offset.total_seconds() | |
left = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.left | |
top = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.top | |
right = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.right | |
bottom = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.bottom | |
frame_n = int(first_entry * video_fps) | |
if Video_Info["processed_info"].get(f"{frame_n}", "") != "": | |
Video_Info["processed_info"][f"{frame_n}"].append((label, left, top, right, bottom)) | |
else: | |
Video_Info["processed_info"][f"{frame_n}"] = [(label, left, top, right, bottom)] | |
sorted_processed = {i: Video_Info["processed_info"][i] for i in sorted((Video_Info["processed_info"].keys()))} | |
Video_Info["processed_info"] = sorted_processed | |
with open(json_file, "w") as fp: | |
json.dump(Video_Info, fp) | |
def display_json_data_on_video_file(input_video, output_video, json_file): | |
with open(json_file, 'r') as f: | |
video_info_read = json.load(f) | |
# Open a video file for reading | |
cap = cv2.VideoCapture(input_video) | |
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
out = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame_width, frame_height)) | |
# Loop through the frames of the video | |
frame_no = 0 | |
while True: | |
# Read a single frame from the video | |
ret, frame = cap.read() | |
# Check if we've reached the end of the video | |
if not ret: | |
break | |
data_ = video_info_read["processed_info"].get(str(frame_no), "") | |
if data_ != "": | |
for label, left, top, right, bottom in data_: | |
# Convert the normalized coordinates to pixel values | |
xmin = int(left * frame_width) | |
ymin = int(top * frame_height) | |
xmax = int(right * frame_width) | |
ymax = int(bottom * frame_height) | |
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2) | |
cv2.putText(frame, label, (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
frame_no+=1 | |
out.write(frame) | |
# Release the video capture object and close the window | |
cap.release() | |
out.release() | |
cv2.destroyAllWindows() | |
if __name__ == "__main__": | |
input_video = "combined_video.mp4" | |
json_file_name = "combined_video.json" | |
output_video = "output_combined_video.mp4" | |
annotation_output = detect_logos(input_video) | |
cap = cv2.VideoCapture(input_video) | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
frame_level_data_dump_to_json(annotation_output, json_file_name, fps) | |
display_json_data_on_video_file(input_video, output_video, json_file_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment