Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save git-hamza/b509a2e9eb4289a5460d7abd8d42b4a7 to your computer and use it in GitHub Desktop.
Save git-hamza/b509a2e9eb4289a5460d7abd8d42b4a7 to your computer and use it in GitHub Desktop.
Draw google video intelligence logo recognition annotate a local video output
"""
Annotate a video in Cloud Storage provides you an option to generate the output as json.
This script can be used you annotate a local video (https://cloud.google.com/video-intelligence/docs/logo-recognition#annotate_a_local_video).
This script will help to generate the bounding box per frame and also draw it on the video.
"""
import io
import json
from google.cloud import videointelligence
from google.protobuf.json_format import MessageToJson
import cv2
def detect_logos(video_input, print_output=False):
client = videointelligence.VideoIntelligenceServiceClient()
with io.open(video_input, "rb") as f:
input_content = f.read()
features = [videointelligence.Feature.LOGO_RECOGNITION]
operation = client.annotate_video(
request={"features": features, "input_content": input_content}
)
print("Waiting for operation to complete...")
response = operation.result()
# Get the first response, since we sent only one video.
annotation_result = response.annotation_results[0]
if print_output:
for annotation in annotation_result.logo_recognition_annotations:
description = annotation.entity.description
for track in annotation.tracks:
confidence = track.confidence
print(
f" {description},"
f" confidence: {confidence:.0%},"
f" frames: {len(track.timestamped_objects)} ".center(80, "-")
)
for timestamped_object in track.timestamped_objects:
t = timestamped_object.time_offset.total_seconds()
box = timestamped_object.normalized_bounding_box
print(
f"{t:>7.3f}",
f"({box.left:.5f}, {box.top:.5f})",
f"({box.right:.5f}, {box.bottom:.5f})",
sep=" | ",
)
return annotation_result.logo_recognition_annotations
def frame_level_data_dump_to_json(logo_recognition_annotation_result, json_file, video_fps):
time_diff = 1/video_fps
Video_Info = {"processed_info": {}}
for annotation in logo_recognition_annotation_result:
label = annotation.entity.description
for logo_parts in annotation.tracks:
for idx in range(len(logo_parts.timestamped_objects)-1):
first_entry = logo_parts.timestamped_objects[idx].time_offset.total_seconds()
second_entry = logo_parts.timestamped_objects[idx+1].time_offset.total_seconds()
entries = int(round((second_entry - first_entry), 3)/time_diff)
left = logo_parts.timestamped_objects[idx].normalized_bounding_box.left
top = logo_parts.timestamped_objects[idx].normalized_bounding_box.top
right = logo_parts.timestamped_objects[idx].normalized_bounding_box.right
bottom = logo_parts.timestamped_objects[idx].normalized_bounding_box.bottom
for entry_pad in range(entries):
first_entry = (first_entry + time_diff) if entry_pad > 0 else first_entry
frame_n = int(first_entry * video_fps)
if Video_Info["processed_info"].get(f"{frame_n}", "") != "":
Video_Info["processed_info"][f"{frame_n}"].append((label, left, top, right, bottom))
else:
Video_Info["processed_info"][f"{frame_n}"] = [(label, left, top, right, bottom)]
else:
first_entry = logo_parts.timestamped_objects[idx+1].time_offset.total_seconds()
left = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.left
top = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.top
right = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.right
bottom = logo_parts.timestamped_objects[idx+1].normalized_bounding_box.bottom
frame_n = int(first_entry * video_fps)
if Video_Info["processed_info"].get(f"{frame_n}", "") != "":
Video_Info["processed_info"][f"{frame_n}"].append((label, left, top, right, bottom))
else:
Video_Info["processed_info"][f"{frame_n}"] = [(label, left, top, right, bottom)]
sorted_processed = {i: Video_Info["processed_info"][i] for i in sorted((Video_Info["processed_info"].keys()))}
Video_Info["processed_info"] = sorted_processed
with open(json_file, "w") as fp:
json.dump(Video_Info, fp)
def display_json_data_on_video_file(input_video, output_video, json_file):
with open(json_file, 'r') as f:
video_info_read = json.load(f)
# Open a video file for reading
cap = cv2.VideoCapture(input_video)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
out = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame_width, frame_height))
# Loop through the frames of the video
frame_no = 0
while True:
# Read a single frame from the video
ret, frame = cap.read()
# Check if we've reached the end of the video
if not ret:
break
data_ = video_info_read["processed_info"].get(str(frame_no), "")
if data_ != "":
for label, left, top, right, bottom in data_:
# Convert the normalized coordinates to pixel values
xmin = int(left * frame_width)
ymin = int(top * frame_height)
xmax = int(right * frame_width)
ymax = int(bottom * frame_height)
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
cv2.putText(frame, label, (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
frame_no+=1
out.write(frame)
# Release the video capture object and close the window
cap.release()
out.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
input_video = "combined_video.mp4"
json_file_name = "combined_video.json"
output_video = "output_combined_video.mp4"
annotation_output = detect_logos(input_video)
cap = cv2.VideoCapture(input_video)
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_level_data_dump_to_json(annotation_output, json_file_name, fps)
display_json_data_on_video_file(input_video, output_video, json_file_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment