|
import cv2 |
|
from collections import namedtuple |
|
import common |
|
from common import CocoPairsNetwork, CocoPairs, CocoPart |
|
import numpy as np |
|
from scipy.ndimage import maximum_filter, gaussian_filter |
|
import itertools |
|
import math |
|
|
|
|
|
class Human: |
|
""" |
|
body_parts: list of BodyPart |
|
""" |
|
__slots__ = ('body_parts', 'pairs', 'uidx_list') |
|
|
|
def __init__(self, pairs): |
|
self.pairs = [] |
|
self.uidx_list = set() |
|
self.body_parts = {} |
|
for pair in pairs: |
|
self.add_pair(pair) |
|
|
|
@staticmethod |
|
def _get_uidx(part_idx, idx): |
|
return '%d-%d' % (part_idx, idx) |
|
|
|
def add_pair(self, pair): |
|
self.pairs.append(pair) |
|
self.body_parts[pair.part_idx1] = BodyPart(Human._get_uidx(pair.part_idx1, pair.idx1), |
|
pair.part_idx1, |
|
pair.coord1[0], pair.coord1[1], pair.score) |
|
self.body_parts[pair.part_idx2] = BodyPart(Human._get_uidx(pair.part_idx2, pair.idx2), |
|
pair.part_idx2, |
|
pair.coord2[0], pair.coord2[1], pair.score) |
|
self.uidx_list.add(Human._get_uidx(pair.part_idx1, pair.idx1)) |
|
self.uidx_list.add(Human._get_uidx(pair.part_idx2, pair.idx2)) |
|
|
|
def is_connected(self, other): |
|
return len(self.uidx_list & other.uidx_list) > 0 |
|
|
|
def merge(self, other): |
|
for pair in other.pairs: |
|
self.add_pair(pair) |
|
|
|
def part_count(self): |
|
return len(self.body_parts.keys()) |
|
|
|
def get_max_score(self): |
|
return max([x.score for _, x in self.body_parts.items()]) |
|
|
|
def __str__(self): |
|
return ' '.join([str(x) for x in self.body_parts.values()]) |
|
|
|
|
|
class BodyPart: |
|
""" |
|
part_idx : part index(eg. 0 for nose) |
|
x, y: coordinate of body part |
|
score : confidence score |
|
""" |
|
__slots__ = ('uidx', 'part_idx', 'x', 'y', 'score') |
|
|
|
def __init__(self, uidx, part_idx, x, y, score): |
|
self.uidx = uidx |
|
self.part_idx = part_idx |
|
self.x, self.y = x, y |
|
self.score = score |
|
|
|
def get_part_name(self): |
|
return CocoPart(self.part_idx) |
|
|
|
def __str__(self): |
|
return 'BodyPart:%d-(%.2f, %.2f) score=%.2f' % (self.part_idx, self.x, self.y, self.score) |
|
|
|
class PoseEstimator: |
|
heatmap_supress = False |
|
heatmap_gaussian = False |
|
adaptive_threshold = False |
|
|
|
NMS_Threshold = 0.1 |
|
Local_PAF_Threshold = 0.1 |
|
PAF_Count_Threshold = 5 |
|
Part_Count_Threshold = 4 |
|
Part_Score_Threshold = 0.6 |
|
|
|
PartPair = namedtuple('PartPair', [ |
|
'score', |
|
'part_idx1', 'part_idx2', |
|
'idx1', 'idx2', |
|
'coord1', 'coord2', |
|
'score1', 'score2' |
|
], verbose=False) |
|
|
|
@staticmethod |
|
def non_max_suppression(plain, window_size=3, threshold=NMS_Threshold): |
|
under_threshold_indices = plain < threshold |
|
plain[under_threshold_indices] = 0 |
|
return plain * (plain == maximum_filter(plain, footprint=np.ones((window_size, window_size)))) |
|
|
|
@staticmethod |
|
def estimate(heat_mat, paf_mat): |
|
if heat_mat.shape[2] == 19: |
|
heat_mat = np.rollaxis(heat_mat, 2, 0) |
|
if paf_mat.shape[2] == 38: |
|
paf_mat = np.rollaxis(paf_mat, 2, 0) |
|
|
|
if PoseEstimator.heatmap_supress: |
|
heat_mat = heat_mat - heat_mat.min(axis=1).min(axis=1).reshape(19, 1, 1) |
|
heat_mat = heat_mat - heat_mat.min(axis=2).reshape(19, heat_mat.shape[1], 1) |
|
|
|
if PoseEstimator.heatmap_gaussian: |
|
heat_mat = gaussian_filter(heat_mat, sigma=0.5) |
|
|
|
if PoseEstimator.adaptive_threshold: |
|
_NMS_Threshold = max(np.average(heat_mat) * 4.0, PoseEstimator.NMS_Threshold) |
|
_NMS_Threshold = min(_NMS_Threshold, 0.3) |
|
else: |
|
_NMS_Threshold = PoseEstimator.NMS_Threshold |
|
|
|
# extract interesting coordinates using NMS. |
|
coords = [] # [[coords in plane1], [....], ...] |
|
for plain in heat_mat[:-1]: |
|
nms = PoseEstimator.non_max_suppression(plain, 5, _NMS_Threshold) |
|
coords.append(np.where(nms >= _NMS_Threshold)) |
|
|
|
# score pairs |
|
pairs_by_conn = list() |
|
for (part_idx1, part_idx2), (paf_x_idx, paf_y_idx) in zip(CocoPairs, CocoPairsNetwork): |
|
pairs = PoseEstimator.score_pairs( |
|
part_idx1, part_idx2, |
|
coords[part_idx1], coords[part_idx2], |
|
paf_mat[paf_x_idx], paf_mat[paf_y_idx], |
|
heatmap=heat_mat, |
|
rescale=(1.0 / heat_mat.shape[2], 1.0 / heat_mat.shape[1]) |
|
) |
|
|
|
pairs_by_conn.extend(pairs) |
|
|
|
# merge pairs to human |
|
# pairs_by_conn is sorted by CocoPairs(part importance) and Score between Parts. |
|
humans = [Human([pair]) for pair in pairs_by_conn] |
|
while True: |
|
merge_items = None |
|
for k1, k2 in itertools.combinations(humans, 2): |
|
if k1 == k2: |
|
continue |
|
if k1.is_connected(k2): |
|
merge_items = (k1, k2) |
|
break |
|
|
|
if merge_items is not None: |
|
merge_items[0].merge(merge_items[1]) |
|
humans.remove(merge_items[1]) |
|
else: |
|
break |
|
|
|
# reject by subset count |
|
humans = [human for human in humans if human.part_count() >= PoseEstimator.PAF_Count_Threshold] |
|
|
|
# reject by subset max score |
|
humans = [human for human in humans if human.get_max_score() >= PoseEstimator.Part_Score_Threshold] |
|
|
|
return humans |
|
|
|
@staticmethod |
|
def score_pairs(part_idx1, part_idx2, coord_list1, coord_list2, paf_mat_x, paf_mat_y, heatmap, rescale=(1.0, 1.0)): |
|
connection_temp = [] |
|
|
|
cnt = 0 |
|
for idx1, (y1, x1) in enumerate(zip(coord_list1[0], coord_list1[1])): |
|
for idx2, (y2, x2) in enumerate(zip(coord_list2[0], coord_list2[1])): |
|
score, count = PoseEstimator.get_score(x1, y1, x2, y2, paf_mat_x, paf_mat_y) |
|
cnt += 1 |
|
if count < PoseEstimator.PAF_Count_Threshold or score <= 0.0: |
|
continue |
|
connection_temp.append(PoseEstimator.PartPair( |
|
score=score, |
|
part_idx1=part_idx1, part_idx2=part_idx2, |
|
idx1=idx1, idx2=idx2, |
|
coord1=(x1 * rescale[0], y1 * rescale[1]), |
|
coord2=(x2 * rescale[0], y2 * rescale[1]), |
|
score1=heatmap[part_idx1][y1][x1], |
|
score2=heatmap[part_idx2][y2][x2], |
|
)) |
|
|
|
connection = [] |
|
used_idx1, used_idx2 = set(), set() |
|
for candidate in sorted(connection_temp, key=lambda x: x.score, reverse=True): |
|
# check not connected |
|
if candidate.idx1 in used_idx1 or candidate.idx2 in used_idx2: |
|
continue |
|
connection.append(candidate) |
|
used_idx1.add(candidate.idx1) |
|
used_idx2.add(candidate.idx2) |
|
|
|
return connection |
|
|
|
@staticmethod |
|
def get_score(x1, y1, x2, y2, paf_mat_x, paf_mat_y): |
|
__num_inter = 10 |
|
__num_inter_f = float(__num_inter) |
|
dx, dy = x2 - x1, y2 - y1 |
|
normVec = math.sqrt(dx ** 2 + dy ** 2) |
|
|
|
if normVec < 1e-4: |
|
return 0.0, 0 |
|
|
|
vx, vy = dx / normVec, dy / normVec |
|
|
|
xs = np.arange(x1, x2, dx / __num_inter_f) if x1 != x2 else np.full((__num_inter,), x1) |
|
ys = np.arange(y1, y2, dy / __num_inter_f) if y1 != y2 else np.full((__num_inter,), y1) |
|
xs = (xs + 0.5).astype(np.int8) |
|
ys = (ys + 0.5).astype(np.int8) |
|
|
|
# without vectorization |
|
pafXs = np.zeros(__num_inter) |
|
pafYs = np.zeros(__num_inter) |
|
for idx, (mx, my) in enumerate(zip(xs, ys)): |
|
pafXs[idx] = paf_mat_x[my][mx] |
|
pafYs[idx] = paf_mat_y[my][mx] |
|
|
|
# vectorization slow? |
|
# pafXs = pafMatX[ys, xs] |
|
# pafYs = pafMatY[ys, xs] |
|
|
|
local_scores = pafXs * vx + pafYs * vy |
|
thidxs = local_scores > PoseEstimator.Local_PAF_Threshold |
|
|
|
return sum(local_scores * thidxs), sum(thidxs) |
|
|
|
@staticmethod |
|
def draw_humans(npimg, humans, imgcopy=False): |
|
if imgcopy: |
|
npimg = np.copy(npimg) |
|
image_h, image_w = npimg.shape[:2] |
|
centers = {} |
|
for human in humans: |
|
# draw point |
|
for i in range(common.CocoPart.Background.value): |
|
if i not in human.body_parts.keys(): |
|
continue |
|
|
|
body_part = human.body_parts[i] |
|
center = (int(body_part.x * image_w + 0.5), int(body_part.y * image_h + 0.5)) |
|
centers[i] = center |
|
cv2.circle(npimg, center, 3, common.CocoColors[i], thickness=3, lineType=8, shift=0) |
|
|
|
# draw line |
|
for pair_order, pair in enumerate(common.CocoPairsRender): |
|
if pair[0] not in human.body_parts.keys() or pair[1] not in human.body_parts.keys(): |
|
continue |
|
|
|
# npimg = cv2.line(npimg, centers[pair[0]], centers[pair[1]], common.CocoColors[pair_order], 3) |
|
cv2.line(npimg, centers[pair[0]], centers[pair[1]], common.CocoColors[pair_order], 3) |
|
|
|
return npimg |
|
|
|
|
|
def inference(self, npimg, modelFile, width, height): |
|
|
|
net = cv2.dnn.readNetFromTensorflow(modelFile) |
|
|
|
inp = cv2.dnn.blobFromImage(npimg, 1.0, (width, height), |
|
(0, 0, 0), swapRB=False, crop=False) |
|
net.setInput(inp) |
|
out = net.forward() |
|
|
|
self.heatMat = out[0, :19, :, :] |
|
self.pafMat = out[0, 19:, :, :] |
|
|
|
humans = PoseEstimator.estimate(self.heatMat, self.pafMat) |
|
return humans |