Skip to content

Instantly share code, notes, and snippets.

@ZackAkil
Created November 11, 2020 13:50
Show Gist options
  • Save ZackAkil/559a7ec1498e04801b0752126e1da13c to your computer and use it in GitHub Desktop.
Save ZackAkil/559a7ec1498e04801b0752126e1da13c to your computer and use it in GitHub Desktop.
Google Cloud Function to Generate a tracking gif of a body part using Video Intelligence API and Storage Triggers.
import io
import numpy as np
import imageio
import cv2
from google.cloud import storage
from google.cloud import videointelligence_v1p3beta1 as videointelligence
storage_client = storage.Client()
client = videointelligence.VideoIntelligenceServiceClient()
OUTPUT_BUCKET_NAME = 'YOUR BUCKET NAME' # NO gs://, e.g 'my-output-bucket'
LANDMARK_TO_TRACK = 'right_wrist'
# configure person detection
config = videointelligence.types.PersonDetectionConfig(
include_bounding_boxes=True, # include bounding box round whole body
include_attributes=False, # include cloths info
include_pose_landmarks=True, # include body joints info
)
video_context = videointelligence.types.VideoContext(person_detection_config=config)
def hello_gcs(event, context):
print(event)
video_bucket = storage_client.get_bucket(event['bucket'])
gs_video_blob = video_bucket.blob(event['name'])
# download from google storage
with open('/tmp/'+ event['name'], 'wb') as file_obj:
local_file_name = file_obj.name
gs_video_blob.download_to_file(file_obj)
gcs_uri = 'gs://' + event['bucket'] + '/' + event['name']
operation = client.annotate_video(
input_uri=gcs_uri,
features=[videointelligence.enums.Feature.PERSON_DETECTION],
video_context=video_context,
)
print("\nProcessing video for person detection annotations.")
result = operation.result(timeout=300)
print("Finnished processing")
# get the first tracked person from the annotations
person_track = result.annotation_results[0].person_detection_annotations[0].tracks[0].timestamped_objects
# define how many pixels around the wrist to crop out
square_padding = 100
# store images of hands for playback
hand_images = []
video, fps, frame_count, duration = load_video(local_file_name)
print('video loaded, fps:', fps,' frames:', frame_count,' duration:', duration)
for body_frame in person_track[::]:
frame_time_millis = body_frame.time_offset.ToMilliseconds()
# find image from video that matches timestamp
image = seek_video_to_time(video, fps, frame_time_millis)
height, width, channels = image.shape
# get just the right wrist landmark
wrist = get_landmark(body_frame.landmarks, LANDMARK_TO_TRACK)
# if it's high enough confidence
if wrist and wrist.confidence > .3:
print(frame_time_millis, 'ms')
wrist_landmark_x = wrist.point.x * width
wrist_landmark_y = wrist.point.y * height
crop_x = wrist_landmark_x - square_padding
crop_y = wrist_landmark_y - square_padding
crop_width = square_padding*2
crop_height = square_padding*2
# draw just hand image
crop_box = (crop_x, crop_y, crop_width, crop_height)
hand_image = get_image_crop(image, crop_box)
hand_images.append(hand_image)
file_name = event['name'].split('.')[0]
gif_file_name = '/tmp/'+file_name+'.gif'
imageio.mimsave(gif_file_name, hand_images)
# upload to google storage
output_video_bucket = storage_client.get_bucket(OUTPUT_BUCKET_NAME)
new_blob_name = event['name'].split('.')[0]+'_'+LANDMARK_TO_TRACK+'.gif'
new_blob = output_video_bucket.blob(new_blob_name)
new_blob.upload_from_filename(gif_file_name)
# function for getting a crop portation of an image as its own image
def get_image_crop(image, box):
x, y, width, height = [int(dim) for dim in box]
# pad image so that when we crop we get constant size images if the crop
# is happening near the edge of the image
padded_image = np.pad(image, ((height, height), (width, width), (0,0)),
constant_values = 0)
# offset the coordinates to account for added padding
x += width
y += height
image_array = padded_image[y: y + height, x: x + width]
return image_array
# function to load video and get video info
def load_video(file_path):
video = cv2.VideoCapture(file_path)
fps = video.get(cv2.CAP_PROP_FPS)
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count/fps
return (video, fps, frame_count, duration)
# function for getting a single body part from the landmarks data
def get_landmark(landmarks, body_part_name):
for landmark in landmarks:
if landmark.name == body_part_name:
return landmark
return None
# function to seek video to specifc time
def seek_video_to_time(video, fps, frame_time):
current_index = video.get(cv2.CAP_PROP_POS_FRAMES)
target_frame_index = int((frame_time/1000.0) * fps)
image = None
if target_frame_index == 0:
success, image = video.read()
while current_index < target_frame_index:
success, image = video.read()
current_index += 1
return image
google-cloud-storage
google-cloud-videointelligence
opencv-python
imageio
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment