Skip to content

Instantly share code, notes, and snippets.

@SheikSadi
Last active October 13, 2021 18:34
Show Gist options
  • Save SheikSadi/6303f75cb1a82f7a8f3777fdec2a4ca3 to your computer and use it in GitHub Desktop.
Save SheikSadi/6303f75cb1a82f7a8f3777fdec2a4ca3 to your computer and use it in GitHub Desktop.
# Copyright (c) OpenMMLab. All rights reserved.
import os
import warnings
from argparse import ArgumentParser
import cv2
from mmpose.apis import (get_track_id, inference_top_down_pose_model,
init_pose_model, process_mmdet_results,
vis_pose_tracking_result)
from mmpose.datasets import DatasetInfo
try:
from mmdet.apis import inference_detector, init_detector
has_mmdet = True
except (ImportError, ModuleNotFoundError):
has_mmdet = False
def main():
"""Visualize the demo images.
Using mmdet to detect the human.
"""
parser = ArgumentParser()
parser.add_argument('det_config', help='Config file for detection')
parser.add_argument('det_checkpoint', help='Checkpoint file for detection')
parser.add_argument('pose_config', help='Config file for pose')
parser.add_argument('pose_checkpoint', help='Checkpoint file for pose')
parser.add_argument('--video-path', type=str, help='Video path')
parser.add_argument(
'--show',
action='store_true',
default=False,
help='whether to show visualizations.')
parser.add_argument(
'--out-video-root',
default='',
help='Root of the output video file. '
'Default not saving the visualization video.')
parser.add_argument(
'--device', default='cuda:0', help='Device used for inference')
parser.add_argument(
'--det-cat-id',
type=int,
default=1,
help='Category id for bounding box detection model')
parser.add_argument(
'--bbox-thr',
type=float,
default=0.3,
help='Bounding box score threshold')
parser.add_argument(
'--kpt-thr', type=float, default=0.3, help='Keypoint score threshold')
parser.add_argument(
'--use-oks-tracking', action='store_true', help='Using OKS tracking')
parser.add_argument(
'--tracking-thr', type=float, default=0.3, help='Tracking threshold')
parser.add_argument(
'--euro',
action='store_true',
help='Using One_Euro_Filter for smoothing')
parser.add_argument(
'--radius',
type=int,
default=4,
help='Keypoint radius for visualization')
parser.add_argument(
'--thickness',
type=int,
default=1,
help='Link thickness for visualization')
assert has_mmdet, 'Please install mmdet to run the demo.'
args = parser.parse_args()
assert args.show or (args.out_video_root != '')
assert args.det_config is not None
assert args.det_checkpoint is not None
det_model = init_detector(
args.det_config, args.det_checkpoint, device=args.device.lower())
# build the pose model from a config file and a checkpoint file
pose_model = init_pose_model(
args.pose_config, args.pose_checkpoint, device=args.device.lower())
dataset = pose_model.cfg.data['test']['type']
dataset_info = pose_model.cfg.data['test'].get('dataset_info', None)
if dataset_info is None:
warnings.warn(
'Please set `dataset_info` in the config.'
'Check https://github.com/open-mmlab/mmpose/pull/663 for details.',
DeprecationWarning)
else:
dataset_info = DatasetInfo(dataset_info)
###############################################################################
########################### MY EDIT STARTS FROM HERE ##########################
###############################################################################
from threading import *
from time import sleep
import numpy as np
from os import mkdir, path, getcwd, chdir
chdir(arg.video_path)
###############################################################################
######################## Creating a Video class of objects ####################
###############################################################################
class Video(Thread):
def __init__(self, name):
super(Video, self).__init__()
self.name = name
self.count = 1
if not path.exists('Frames'): mkdir('Frames')
self.vidobj = cv2.VideoCapture(f'{name}.mp4')
self.success, self.image = self.vidobj.read()
###############################################################################
################# initialize all 4 Video instances before #####################
################# running the calling the following method ####################
###############################################################################
def run(self):
while (vid1.success or vid2.success or vid3.success or vid4.success):
sleep(0.5)
if not self.success:
self.image = np.zeros([1080,1920,3])
works = cv2.imwrite(f'Frames/Frame{self.count}_pos{self.name}.jpg', self.image)
try:
works = cv2.imwrite(f'Frames/Frame{self.count}_pos{self.name}.jpg', self.image)
except:
print(f'Unsuccessful at saving frame, inserting a black image')
self.image = np.zeros([1080,1920,3])
works = cv2.imwrite(f'Frames/Frame{self.count}_pos{self.name}.jpg', self.image)
self.count += 1
self.success, self.image = self.vidobj.read()
###############################################################################
##################### merger() will run on the main thread ####################
###############################################################################
def merger(sleep_time, count=1):
works = True
while works:
sleep(sleep_time)
img11 = cv2.imread(f'Frames/Frame{count}_pos1.jpg')
img12 = cv2.imread(f'Frames/Frame{count}_pos2.jpg')
img21 = cv2.imread(f'Frames/Frame{count}_pos3.jpg')
img22 = cv2.imread(f'Frames/Frame{count}_pos4.jpg')
row1 = np.concatenate([img11, img12], axis=1)
row2 = np.concatenate([img21, img22], axis=1)
large_image = np.concatenate([row1, row2], axis=0)
resized = cv2.resize(large_image, (1920,1080))
if not path.exists('merged_frames'):
mkdir('merged_frames')
works = cv2.imwrite(f'merged_frames/mFrame{count}.jpg', resized)
if works:
print(f'{count} frames merged!')
count += 1
else:
print("Could not merge, fatal error. Aborting...")
break
###############################################################################
##################### Instantiate the 4 Video objects #########################
###############################################################################
vid1 = Video(1)
vid2 = Video(2)
vid3 = Video(3)
vid4 = Video(4)
# Start Multithreading
#
vid1.start()
vid2.start()
vid3.start()
vid4.start()
#############################################################
###################### you run too fast! ###################
###################### let the threads go ahead #############
###################### you can catch them up. ###############
#############################################################
sleep(60)
# 60 secs wasn't too long, was it?
# Now GO!
#
# 23 minutes before completion
#
merger(sleep_time=1.75)
#
#
import numpy as np
import glob
img_array = []
count = 1
while True:
try:
img = cv2.imread(f'merged_frames/mFrame{count}.jpg')
except:
break
count += 1
height, width, layers = img.shape
size = (width,height)
img_array.append(img)
out = cv2.VideoWriter('Merged_video.mp4', cv2.VideoWriter_fourcc(*'MP4V'), 30, size)
for i in range(len(img_array)):
out.write(img_array[i])
out.release()
###############################################################################
############################## MY EDIT COMPLETE ###############################
###############################################################################
cap = cv2.VideoCapture('Merged_video.mp4')
fps = None
assert cap.isOpened(), f'Faild to load video file {args.video_path}'
if args.out_video_root == '':
save_out_video = False
else:
os.makedirs(args.out_video_root, exist_ok=True)
save_out_video = True
if save_out_video:
fps = cap.get(cv2.CAP_PROP_FPS)
size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
videoWriter = cv2.VideoWriter(
os.path.join(args.out_video_root,
f'vis_{os.path.basename(args.video_path)}'), fourcc,
fps, size)
# optional
return_heatmap = False
# e.g. use ('backbone', ) to return backbone feature
output_layer_names = None
next_id = 0
pose_results = []
while (cap.isOpened()):
pose_results_last = pose_results
flag, img = cap.read()
if not flag:
break
# test a single image, the resulting box is (x1, y1, x2, y2)
mmdet_results = inference_detector(det_model, img)
# keep the person class bounding boxes.
person_results = process_mmdet_results(mmdet_results, args.det_cat_id)
# test a single image, with a list of bboxes.
pose_results, returned_outputs = inference_top_down_pose_model(
pose_model,
img,
person_results,
bbox_thr=args.bbox_thr,
format='xyxy',
dataset=dataset,
dataset_info=dataset_info,
return_heatmap=return_heatmap,
outputs=output_layer_names)
# get track id for each person instance
pose_results, next_id = get_track_id(
pose_results,
pose_results_last,
next_id,
use_oks=args.use_oks_tracking,
tracking_thr=args.tracking_thr,
use_one_euro=args.euro,
fps=fps)
# show the results
vis_img = vis_pose_tracking_result(
pose_model,
img,
pose_results,
radius=args.radius,
thickness=args.thickness,
dataset=dataset,
dataset_info=dataset_info,
kpt_score_thr=args.kpt_thr,
show=False)
if args.show:
cv2.imshow('Image', vis_img)
if save_out_video:
videoWriter.write(vis_img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
if save_out_video:
videoWriter.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment