Skip to content

Instantly share code, notes, and snippets.

@buff4life123

buff4life123/common

Created Jul 16, 2018
Embed
What would you like to do?
OpenPose
from enum import Enum
class CocoPart(Enum):
Nose = 0
Neck = 1
RShoulder = 2
RElbow = 3
RWrist = 4
LShoulder = 5
LElbow = 6
LWrist = 7
RHip = 8
RKnee = 9
RAnkle = 10
LHip = 11
LKnee = 12
LAnkle = 13
REye = 14
LEye = 15
REar = 16
LEar = 17
Background = 18
class MPIIPart(Enum):
RAnkle = 0
RKnee = 1
RHip = 2
LHip = 3
LKnee = 4
LAnkle = 5
RWrist = 6
RElbow = 7
RShoulder = 8
LShoulder = 9
LElbow = 10
LWrist = 11
Neck = 12
Head = 13
CocoPairs = [
(1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11),
(11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), (2, 16), (5, 17)
]
CocoPairsRender = CocoPairs[:-2]
CocoPairsNetwork = [
(12, 13), (20, 21), (14, 15), (16, 17), (22, 23), (24, 25), (0, 1), (2, 3), (4, 5),
(6, 7), (8, 9), (10, 11), (28, 29), (30, 31), (34, 35), (32, 33), (36, 37), (18, 19), (26, 27)
] # = 19
CocoColors = [[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0],
[0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255],
[170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]]
import cv2
from collections import namedtuple
import common
from common import CocoPairsNetwork, CocoPairs, CocoPart
import numpy as np
from scipy.ndimage import maximum_filter, gaussian_filter
import itertools
import math
class Human:
"""
body_parts: list of BodyPart
"""
__slots__ = ('body_parts', 'pairs', 'uidx_list')
def __init__(self, pairs):
self.pairs = []
self.uidx_list = set()
self.body_parts = {}
for pair in pairs:
self.add_pair(pair)
@staticmethod
def _get_uidx(part_idx, idx):
return '%d-%d' % (part_idx, idx)
def add_pair(self, pair):
self.pairs.append(pair)
self.body_parts[pair.part_idx1] = BodyPart(Human._get_uidx(pair.part_idx1, pair.idx1),
pair.part_idx1,
pair.coord1[0], pair.coord1[1], pair.score)
self.body_parts[pair.part_idx2] = BodyPart(Human._get_uidx(pair.part_idx2, pair.idx2),
pair.part_idx2,
pair.coord2[0], pair.coord2[1], pair.score)
self.uidx_list.add(Human._get_uidx(pair.part_idx1, pair.idx1))
self.uidx_list.add(Human._get_uidx(pair.part_idx2, pair.idx2))
def is_connected(self, other):
return len(self.uidx_list & other.uidx_list) > 0
def merge(self, other):
for pair in other.pairs:
self.add_pair(pair)
def part_count(self):
return len(self.body_parts.keys())
def get_max_score(self):
return max([x.score for _, x in self.body_parts.items()])
def __str__(self):
return ' '.join([str(x) for x in self.body_parts.values()])
class BodyPart:
"""
part_idx : part index(eg. 0 for nose)
x, y: coordinate of body part
score : confidence score
"""
__slots__ = ('uidx', 'part_idx', 'x', 'y', 'score')
def __init__(self, uidx, part_idx, x, y, score):
self.uidx = uidx
self.part_idx = part_idx
self.x, self.y = x, y
self.score = score
def get_part_name(self):
return CocoPart(self.part_idx)
def __str__(self):
return 'BodyPart:%d-(%.2f, %.2f) score=%.2f' % (self.part_idx, self.x, self.y, self.score)
class PoseEstimator:
heatmap_supress = False
heatmap_gaussian = False
adaptive_threshold = False
NMS_Threshold = 0.1
Local_PAF_Threshold = 0.1
PAF_Count_Threshold = 5
Part_Count_Threshold = 4
Part_Score_Threshold = 0.6
PartPair = namedtuple('PartPair', [
'score',
'part_idx1', 'part_idx2',
'idx1', 'idx2',
'coord1', 'coord2',
'score1', 'score2'
], verbose=False)
@staticmethod
def non_max_suppression(plain, window_size=3, threshold=NMS_Threshold):
under_threshold_indices = plain < threshold
plain[under_threshold_indices] = 0
return plain * (plain == maximum_filter(plain, footprint=np.ones((window_size, window_size))))
@staticmethod
def estimate(heat_mat, paf_mat):
if heat_mat.shape[2] == 19:
heat_mat = np.rollaxis(heat_mat, 2, 0)
if paf_mat.shape[2] == 38:
paf_mat = np.rollaxis(paf_mat, 2, 0)
if PoseEstimator.heatmap_supress:
heat_mat = heat_mat - heat_mat.min(axis=1).min(axis=1).reshape(19, 1, 1)
heat_mat = heat_mat - heat_mat.min(axis=2).reshape(19, heat_mat.shape[1], 1)
if PoseEstimator.heatmap_gaussian:
heat_mat = gaussian_filter(heat_mat, sigma=0.5)
if PoseEstimator.adaptive_threshold:
_NMS_Threshold = max(np.average(heat_mat) * 4.0, PoseEstimator.NMS_Threshold)
_NMS_Threshold = min(_NMS_Threshold, 0.3)
else:
_NMS_Threshold = PoseEstimator.NMS_Threshold
# extract interesting coordinates using NMS.
coords = [] # [[coords in plane1], [....], ...]
for plain in heat_mat[:-1]:
nms = PoseEstimator.non_max_suppression(plain, 5, _NMS_Threshold)
coords.append(np.where(nms >= _NMS_Threshold))
# score pairs
pairs_by_conn = list()
for (part_idx1, part_idx2), (paf_x_idx, paf_y_idx) in zip(CocoPairs, CocoPairsNetwork):
pairs = PoseEstimator.score_pairs(
part_idx1, part_idx2,
coords[part_idx1], coords[part_idx2],
paf_mat[paf_x_idx], paf_mat[paf_y_idx],
heatmap=heat_mat,
rescale=(1.0 / heat_mat.shape[2], 1.0 / heat_mat.shape[1])
)
pairs_by_conn.extend(pairs)
# merge pairs to human
# pairs_by_conn is sorted by CocoPairs(part importance) and Score between Parts.
humans = [Human([pair]) for pair in pairs_by_conn]
while True:
merge_items = None
for k1, k2 in itertools.combinations(humans, 2):
if k1 == k2:
continue
if k1.is_connected(k2):
merge_items = (k1, k2)
break
if merge_items is not None:
merge_items[0].merge(merge_items[1])
humans.remove(merge_items[1])
else:
break
# reject by subset count
humans = [human for human in humans if human.part_count() >= PoseEstimator.PAF_Count_Threshold]
# reject by subset max score
humans = [human for human in humans if human.get_max_score() >= PoseEstimator.Part_Score_Threshold]
return humans
@staticmethod
def score_pairs(part_idx1, part_idx2, coord_list1, coord_list2, paf_mat_x, paf_mat_y, heatmap, rescale=(1.0, 1.0)):
connection_temp = []
cnt = 0
for idx1, (y1, x1) in enumerate(zip(coord_list1[0], coord_list1[1])):
for idx2, (y2, x2) in enumerate(zip(coord_list2[0], coord_list2[1])):
score, count = PoseEstimator.get_score(x1, y1, x2, y2, paf_mat_x, paf_mat_y)
cnt += 1
if count < PoseEstimator.PAF_Count_Threshold or score <= 0.0:
continue
connection_temp.append(PoseEstimator.PartPair(
score=score,
part_idx1=part_idx1, part_idx2=part_idx2,
idx1=idx1, idx2=idx2,
coord1=(x1 * rescale[0], y1 * rescale[1]),
coord2=(x2 * rescale[0], y2 * rescale[1]),
score1=heatmap[part_idx1][y1][x1],
score2=heatmap[part_idx2][y2][x2],
))
connection = []
used_idx1, used_idx2 = set(), set()
for candidate in sorted(connection_temp, key=lambda x: x.score, reverse=True):
# check not connected
if candidate.idx1 in used_idx1 or candidate.idx2 in used_idx2:
continue
connection.append(candidate)
used_idx1.add(candidate.idx1)
used_idx2.add(candidate.idx2)
return connection
@staticmethod
def get_score(x1, y1, x2, y2, paf_mat_x, paf_mat_y):
__num_inter = 10
__num_inter_f = float(__num_inter)
dx, dy = x2 - x1, y2 - y1
normVec = math.sqrt(dx ** 2 + dy ** 2)
if normVec < 1e-4:
return 0.0, 0
vx, vy = dx / normVec, dy / normVec
xs = np.arange(x1, x2, dx / __num_inter_f) if x1 != x2 else np.full((__num_inter,), x1)
ys = np.arange(y1, y2, dy / __num_inter_f) if y1 != y2 else np.full((__num_inter,), y1)
xs = (xs + 0.5).astype(np.int8)
ys = (ys + 0.5).astype(np.int8)
# without vectorization
pafXs = np.zeros(__num_inter)
pafYs = np.zeros(__num_inter)
for idx, (mx, my) in enumerate(zip(xs, ys)):
pafXs[idx] = paf_mat_x[my][mx]
pafYs[idx] = paf_mat_y[my][mx]
# vectorization slow?
# pafXs = pafMatX[ys, xs]
# pafYs = pafMatY[ys, xs]
local_scores = pafXs * vx + pafYs * vy
thidxs = local_scores > PoseEstimator.Local_PAF_Threshold
return sum(local_scores * thidxs), sum(thidxs)
@staticmethod
def draw_humans(npimg, humans, imgcopy=False):
if imgcopy:
npimg = np.copy(npimg)
image_h, image_w = npimg.shape[:2]
centers = {}
for human in humans:
# draw point
for i in range(common.CocoPart.Background.value):
if i not in human.body_parts.keys():
continue
body_part = human.body_parts[i]
center = (int(body_part.x * image_w + 0.5), int(body_part.y * image_h + 0.5))
centers[i] = center
cv2.circle(npimg, center, 3, common.CocoColors[i], thickness=3, lineType=8, shift=0)
# draw line
for pair_order, pair in enumerate(common.CocoPairsRender):
if pair[0] not in human.body_parts.keys() or pair[1] not in human.body_parts.keys():
continue
# npimg = cv2.line(npimg, centers[pair[0]], centers[pair[1]], common.CocoColors[pair_order], 3)
cv2.line(npimg, centers[pair[0]], centers[pair[1]], common.CocoColors[pair_order], 3)
return npimg
def inference(self, npimg, modelFile, width, height):
net = cv2.dnn.readNetFromTensorflow(modelFile)
inp = cv2.dnn.blobFromImage(npimg, 1.0, (width, height),
(0, 0, 0), swapRB=False, crop=False)
net.setInput(inp)
out = net.forward()
self.heatMat = out[0, :19, :, :]
self.pafMat = out[0, 19:, :, :]
humans = PoseEstimator.estimate(self.heatMat, self.pafMat)
return humans
import estimator
import cv2
import argparse
import time
parser = argparse.ArgumentParser(
description='This script is used to demonstrate OpenPose human pose estimation network '
'from https://github.com/CMU-Perceptual-Computing-Lab/openpose project using OpenCV. '
'The sample and model are simplified and could be used for a single person on the frame.')
parser.add_argument('--input', default="Images/p2.jpg", help='Path to image or video. Skip to capture frames from camera')
parser.add_argument('--proto', help='Path to .prototxt')
parser.add_argument('--model', default="openpose/graph_opt.pb", help='Path to .caffemodel')
parser.add_argument('--dataset',default="COCO" , help='Specify what kind of model was trained. '
'It could be (COCO, MPI) depends on dataset.')
parser.add_argument('--thr', default=0.1, type=float, help='Threshold value for pose parts heat map')
parser.add_argument('--width', default=386, type=int, help='Resize input to specific width.')
parser.add_argument('--height', default=386, type=int, help='Resize input to specific height.')
parser.add_argument('--inf_engine', action='store_true',
help='Enable Intel Inference Engine computational backend. '
'Check that plugins folder is in LD_LIBRARY_PATH environment variable')
args = parser.parse_args()
e = estimator.PoseEstimator()
cap = cv2.VideoCapture(args.input if args.input else 0)
while cv2.waitKey(1) < 0:
hasFrame, frame = cap.read()
if not hasFrame:
cv2.waitKey()
break
t = time.time()
humans = e.inference(frame,args.model,args.width,args.height)
elapsed = time.time() - t
print('inference image: %s in %.4f seconds.' % (args.input, elapsed))
image = e.draw_humans(frame, humans, imgcopy=False)
cv2.imshow('tf-pose-estimation result', image)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment