Created
July 16, 2018 19:57
-
-
Save buff4life123/031bd2d32f758656b113d73900c474e4 to your computer and use it in GitHub Desktop.
OpenPose
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
class CocoPart(Enum): | |
Nose = 0 | |
Neck = 1 | |
RShoulder = 2 | |
RElbow = 3 | |
RWrist = 4 | |
LShoulder = 5 | |
LElbow = 6 | |
LWrist = 7 | |
RHip = 8 | |
RKnee = 9 | |
RAnkle = 10 | |
LHip = 11 | |
LKnee = 12 | |
LAnkle = 13 | |
REye = 14 | |
LEye = 15 | |
REar = 16 | |
LEar = 17 | |
Background = 18 | |
class MPIIPart(Enum): | |
RAnkle = 0 | |
RKnee = 1 | |
RHip = 2 | |
LHip = 3 | |
LKnee = 4 | |
LAnkle = 5 | |
RWrist = 6 | |
RElbow = 7 | |
RShoulder = 8 | |
LShoulder = 9 | |
LElbow = 10 | |
LWrist = 11 | |
Neck = 12 | |
Head = 13 | |
CocoPairs = [ | |
(1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11), | |
(11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), (2, 16), (5, 17) | |
] | |
CocoPairsRender = CocoPairs[:-2] | |
CocoPairsNetwork = [ | |
(12, 13), (20, 21), (14, 15), (16, 17), (22, 23), (24, 25), (0, 1), (2, 3), (4, 5), | |
(6, 7), (8, 9), (10, 11), (28, 29), (30, 31), (34, 35), (32, 33), (36, 37), (18, 19), (26, 27) | |
] # = 19 | |
CocoColors = [[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0], | |
[0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255], | |
[170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]] | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
from collections import namedtuple | |
import common | |
from common import CocoPairsNetwork, CocoPairs, CocoPart | |
import numpy as np | |
from scipy.ndimage import maximum_filter, gaussian_filter | |
import itertools | |
import math | |
class Human: | |
""" | |
body_parts: list of BodyPart | |
""" | |
__slots__ = ('body_parts', 'pairs', 'uidx_list') | |
def __init__(self, pairs): | |
self.pairs = [] | |
self.uidx_list = set() | |
self.body_parts = {} | |
for pair in pairs: | |
self.add_pair(pair) | |
@staticmethod | |
def _get_uidx(part_idx, idx): | |
return '%d-%d' % (part_idx, idx) | |
def add_pair(self, pair): | |
self.pairs.append(pair) | |
self.body_parts[pair.part_idx1] = BodyPart(Human._get_uidx(pair.part_idx1, pair.idx1), | |
pair.part_idx1, | |
pair.coord1[0], pair.coord1[1], pair.score) | |
self.body_parts[pair.part_idx2] = BodyPart(Human._get_uidx(pair.part_idx2, pair.idx2), | |
pair.part_idx2, | |
pair.coord2[0], pair.coord2[1], pair.score) | |
self.uidx_list.add(Human._get_uidx(pair.part_idx1, pair.idx1)) | |
self.uidx_list.add(Human._get_uidx(pair.part_idx2, pair.idx2)) | |
def is_connected(self, other): | |
return len(self.uidx_list & other.uidx_list) > 0 | |
def merge(self, other): | |
for pair in other.pairs: | |
self.add_pair(pair) | |
def part_count(self): | |
return len(self.body_parts.keys()) | |
def get_max_score(self): | |
return max([x.score for _, x in self.body_parts.items()]) | |
def __str__(self): | |
return ' '.join([str(x) for x in self.body_parts.values()]) | |
class BodyPart: | |
""" | |
part_idx : part index(eg. 0 for nose) | |
x, y: coordinate of body part | |
score : confidence score | |
""" | |
__slots__ = ('uidx', 'part_idx', 'x', 'y', 'score') | |
def __init__(self, uidx, part_idx, x, y, score): | |
self.uidx = uidx | |
self.part_idx = part_idx | |
self.x, self.y = x, y | |
self.score = score | |
def get_part_name(self): | |
return CocoPart(self.part_idx) | |
def __str__(self): | |
return 'BodyPart:%d-(%.2f, %.2f) score=%.2f' % (self.part_idx, self.x, self.y, self.score) | |
class PoseEstimator: | |
heatmap_supress = False | |
heatmap_gaussian = False | |
adaptive_threshold = False | |
NMS_Threshold = 0.1 | |
Local_PAF_Threshold = 0.1 | |
PAF_Count_Threshold = 5 | |
Part_Count_Threshold = 4 | |
Part_Score_Threshold = 0.6 | |
PartPair = namedtuple('PartPair', [ | |
'score', | |
'part_idx1', 'part_idx2', | |
'idx1', 'idx2', | |
'coord1', 'coord2', | |
'score1', 'score2' | |
], verbose=False) | |
@staticmethod | |
def non_max_suppression(plain, window_size=3, threshold=NMS_Threshold): | |
under_threshold_indices = plain < threshold | |
plain[under_threshold_indices] = 0 | |
return plain * (plain == maximum_filter(plain, footprint=np.ones((window_size, window_size)))) | |
@staticmethod | |
def estimate(heat_mat, paf_mat): | |
if heat_mat.shape[2] == 19: | |
heat_mat = np.rollaxis(heat_mat, 2, 0) | |
if paf_mat.shape[2] == 38: | |
paf_mat = np.rollaxis(paf_mat, 2, 0) | |
if PoseEstimator.heatmap_supress: | |
heat_mat = heat_mat - heat_mat.min(axis=1).min(axis=1).reshape(19, 1, 1) | |
heat_mat = heat_mat - heat_mat.min(axis=2).reshape(19, heat_mat.shape[1], 1) | |
if PoseEstimator.heatmap_gaussian: | |
heat_mat = gaussian_filter(heat_mat, sigma=0.5) | |
if PoseEstimator.adaptive_threshold: | |
_NMS_Threshold = max(np.average(heat_mat) * 4.0, PoseEstimator.NMS_Threshold) | |
_NMS_Threshold = min(_NMS_Threshold, 0.3) | |
else: | |
_NMS_Threshold = PoseEstimator.NMS_Threshold | |
# extract interesting coordinates using NMS. | |
coords = [] # [[coords in plane1], [....], ...] | |
for plain in heat_mat[:-1]: | |
nms = PoseEstimator.non_max_suppression(plain, 5, _NMS_Threshold) | |
coords.append(np.where(nms >= _NMS_Threshold)) | |
# score pairs | |
pairs_by_conn = list() | |
for (part_idx1, part_idx2), (paf_x_idx, paf_y_idx) in zip(CocoPairs, CocoPairsNetwork): | |
pairs = PoseEstimator.score_pairs( | |
part_idx1, part_idx2, | |
coords[part_idx1], coords[part_idx2], | |
paf_mat[paf_x_idx], paf_mat[paf_y_idx], | |
heatmap=heat_mat, | |
rescale=(1.0 / heat_mat.shape[2], 1.0 / heat_mat.shape[1]) | |
) | |
pairs_by_conn.extend(pairs) | |
# merge pairs to human | |
# pairs_by_conn is sorted by CocoPairs(part importance) and Score between Parts. | |
humans = [Human([pair]) for pair in pairs_by_conn] | |
while True: | |
merge_items = None | |
for k1, k2 in itertools.combinations(humans, 2): | |
if k1 == k2: | |
continue | |
if k1.is_connected(k2): | |
merge_items = (k1, k2) | |
break | |
if merge_items is not None: | |
merge_items[0].merge(merge_items[1]) | |
humans.remove(merge_items[1]) | |
else: | |
break | |
# reject by subset count | |
humans = [human for human in humans if human.part_count() >= PoseEstimator.PAF_Count_Threshold] | |
# reject by subset max score | |
humans = [human for human in humans if human.get_max_score() >= PoseEstimator.Part_Score_Threshold] | |
return humans | |
@staticmethod | |
def score_pairs(part_idx1, part_idx2, coord_list1, coord_list2, paf_mat_x, paf_mat_y, heatmap, rescale=(1.0, 1.0)): | |
connection_temp = [] | |
cnt = 0 | |
for idx1, (y1, x1) in enumerate(zip(coord_list1[0], coord_list1[1])): | |
for idx2, (y2, x2) in enumerate(zip(coord_list2[0], coord_list2[1])): | |
score, count = PoseEstimator.get_score(x1, y1, x2, y2, paf_mat_x, paf_mat_y) | |
cnt += 1 | |
if count < PoseEstimator.PAF_Count_Threshold or score <= 0.0: | |
continue | |
connection_temp.append(PoseEstimator.PartPair( | |
score=score, | |
part_idx1=part_idx1, part_idx2=part_idx2, | |
idx1=idx1, idx2=idx2, | |
coord1=(x1 * rescale[0], y1 * rescale[1]), | |
coord2=(x2 * rescale[0], y2 * rescale[1]), | |
score1=heatmap[part_idx1][y1][x1], | |
score2=heatmap[part_idx2][y2][x2], | |
)) | |
connection = [] | |
used_idx1, used_idx2 = set(), set() | |
for candidate in sorted(connection_temp, key=lambda x: x.score, reverse=True): | |
# check not connected | |
if candidate.idx1 in used_idx1 or candidate.idx2 in used_idx2: | |
continue | |
connection.append(candidate) | |
used_idx1.add(candidate.idx1) | |
used_idx2.add(candidate.idx2) | |
return connection | |
@staticmethod | |
def get_score(x1, y1, x2, y2, paf_mat_x, paf_mat_y): | |
__num_inter = 10 | |
__num_inter_f = float(__num_inter) | |
dx, dy = x2 - x1, y2 - y1 | |
normVec = math.sqrt(dx ** 2 + dy ** 2) | |
if normVec < 1e-4: | |
return 0.0, 0 | |
vx, vy = dx / normVec, dy / normVec | |
xs = np.arange(x1, x2, dx / __num_inter_f) if x1 != x2 else np.full((__num_inter,), x1) | |
ys = np.arange(y1, y2, dy / __num_inter_f) if y1 != y2 else np.full((__num_inter,), y1) | |
xs = (xs + 0.5).astype(np.int8) | |
ys = (ys + 0.5).astype(np.int8) | |
# without vectorization | |
pafXs = np.zeros(__num_inter) | |
pafYs = np.zeros(__num_inter) | |
for idx, (mx, my) in enumerate(zip(xs, ys)): | |
pafXs[idx] = paf_mat_x[my][mx] | |
pafYs[idx] = paf_mat_y[my][mx] | |
# vectorization slow? | |
# pafXs = pafMatX[ys, xs] | |
# pafYs = pafMatY[ys, xs] | |
local_scores = pafXs * vx + pafYs * vy | |
thidxs = local_scores > PoseEstimator.Local_PAF_Threshold | |
return sum(local_scores * thidxs), sum(thidxs) | |
@staticmethod | |
def draw_humans(npimg, humans, imgcopy=False): | |
if imgcopy: | |
npimg = np.copy(npimg) | |
image_h, image_w = npimg.shape[:2] | |
centers = {} | |
for human in humans: | |
# draw point | |
for i in range(common.CocoPart.Background.value): | |
if i not in human.body_parts.keys(): | |
continue | |
body_part = human.body_parts[i] | |
center = (int(body_part.x * image_w + 0.5), int(body_part.y * image_h + 0.5)) | |
centers[i] = center | |
cv2.circle(npimg, center, 3, common.CocoColors[i], thickness=3, lineType=8, shift=0) | |
# draw line | |
for pair_order, pair in enumerate(common.CocoPairsRender): | |
if pair[0] not in human.body_parts.keys() or pair[1] not in human.body_parts.keys(): | |
continue | |
# npimg = cv2.line(npimg, centers[pair[0]], centers[pair[1]], common.CocoColors[pair_order], 3) | |
cv2.line(npimg, centers[pair[0]], centers[pair[1]], common.CocoColors[pair_order], 3) | |
return npimg | |
def inference(self, npimg, modelFile, width, height): | |
net = cv2.dnn.readNetFromTensorflow(modelFile) | |
inp = cv2.dnn.blobFromImage(npimg, 1.0, (width, height), | |
(0, 0, 0), swapRB=False, crop=False) | |
net.setInput(inp) | |
out = net.forward() | |
self.heatMat = out[0, :19, :, :] | |
self.pafMat = out[0, 19:, :, :] | |
humans = PoseEstimator.estimate(self.heatMat, self.pafMat) | |
return humans |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import estimator | |
import cv2 | |
import argparse | |
import time | |
parser = argparse.ArgumentParser( | |
description='This script is used to demonstrate OpenPose human pose estimation network ' | |
'from https://github.com/CMU-Perceptual-Computing-Lab/openpose project using OpenCV. ' | |
'The sample and model are simplified and could be used for a single person on the frame.') | |
parser.add_argument('--input', default="Images/p2.jpg", help='Path to image or video. Skip to capture frames from camera') | |
parser.add_argument('--proto', help='Path to .prototxt') | |
parser.add_argument('--model', default="openpose/graph_opt.pb", help='Path to .caffemodel') | |
parser.add_argument('--dataset',default="COCO" , help='Specify what kind of model was trained. ' | |
'It could be (COCO, MPI) depends on dataset.') | |
parser.add_argument('--thr', default=0.1, type=float, help='Threshold value for pose parts heat map') | |
parser.add_argument('--width', default=386, type=int, help='Resize input to specific width.') | |
parser.add_argument('--height', default=386, type=int, help='Resize input to specific height.') | |
parser.add_argument('--inf_engine', action='store_true', | |
help='Enable Intel Inference Engine computational backend. ' | |
'Check that plugins folder is in LD_LIBRARY_PATH environment variable') | |
args = parser.parse_args() | |
e = estimator.PoseEstimator() | |
cap = cv2.VideoCapture(args.input if args.input else 0) | |
while cv2.waitKey(1) < 0: | |
hasFrame, frame = cap.read() | |
if not hasFrame: | |
cv2.waitKey() | |
break | |
t = time.time() | |
humans = e.inference(frame,args.model,args.width,args.height) | |
elapsed = time.time() - t | |
print('inference image: %s in %.4f seconds.' % (args.input, elapsed)) | |
image = e.draw_humans(frame, humans, imgcopy=False) | |
cv2.imshow('tf-pose-estimation result', image) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment