Skip to content

Instantly share code, notes, and snippets.

@satojkovic
Forked from alesolano/algorithms_openpose.md
Created March 20, 2019 13:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save satojkovic/7f714122012341298e2b505404b75223 to your computer and use it in GitHub Desktop.
Save satojkovic/7f714122012341298e2b505404b75223 to your computer and use it in GitHub Desktop.
OpenPose TensorFlow Alogrithms
connection = []
used_idx1, used_idx2 = [], []
# sort possible connections by score, from maximum to minimum
for conn_candidate in sorted(connection_temp, key=lambda x: x['score'], reverse=True):
# check not connected
if conn_candidate['idx'][0] in used_idx1 or conn_candidate['idx'][1] in used_idx2:
continue
connection.append(conn_candidate)
used_idx1.append(conn_candidate['idx'][0])
used_idx2.append(conn_candidate['idx'][1])
'''
All code is highly based on Ildoo Kim's code (https://github.com/ildoonet/tf-openpose)
and derived from the OpenPose Library (https://github.com/CMU-Perceptual-Computing-Lab/openpose/blob/master/LICENSE)
'''
from collections import defaultdict
from enum import Enum
import math
import numpy as np
import itertools
import cv2
from scipy.ndimage.filters import maximum_filter
class CocoPart(Enum):
Nose = 0
Neck = 1
RShoulder = 2
RElbow = 3
RWrist = 4
LShoulder = 5
LElbow = 6
LWrist = 7
RHip = 8
RKnee = 9
RAnkle = 10
LHip = 11
LKnee = 12
LAnkle = 13
REye = 14
LEye = 15
REar = 16
LEar = 17
Background = 18
CocoPairs = [
(1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11),
(11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), (2, 16), (5, 17)
] # = 19
CocoPairsRender = CocoPairs[:-2]
CocoPairsNetwork = [
(12, 13), (20, 21), (14, 15), (16, 17), (22, 23), (24, 25), (0, 1), (2, 3), (4, 5),
(6, 7), (8, 9), (10, 11), (28, 29), (30, 31), (34, 35), (32, 33), (36, 37), (18, 19), (26, 27)
] # = 19
CocoColors = [[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0],
[0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255],
[170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]]
NMS_Threshold = 0.1
InterMinAbove_Threshold = 6
Inter_Threashold = 0.1
Min_Subset_Cnt = 4
Min_Subset_Score = 0.8
Max_Human = 96
def human_conns_to_human_parts(human_conns, heatMat):
human_parts = defaultdict(lambda: None)
for conn in human_conns:
human_parts[conn['partIdx'][0]] = (
conn['partIdx'][0], # part index
(conn['coord_p1'][0] / heatMat.shape[2], conn['coord_p1'][1] / heatMat.shape[1]), # relative coordinates
heatMat[conn['partIdx'][0], conn['coord_p1'][1], conn['coord_p1'][0]] # score
)
human_parts[conn['partIdx'][1]] = (
conn['partIdx'][1],
(conn['coord_p2'][0] / heatMat.shape[2], conn['coord_p2'][1] / heatMat.shape[1]),
heatMat[conn['partIdx'][1], conn['coord_p2'][1], conn['coord_p2'][0]]
)
return human_parts
def non_max_suppression(heatmap, window_size=3, threshold=NMS_Threshold):
heatmap[heatmap < threshold] = 0 # set low values to 0
part_candidates = heatmap*(heatmap == maximum_filter(heatmap, footprint=np.ones((window_size, window_size))))
return part_candidates
def estimate_pose(heatMat, pafMat):
if heatMat.shape[2] == 19:
# transform from [height, width, n_parts] to [n_parts, height, width]
heatMat = np.rollaxis(heatMat, 2, 0)
if pafMat.shape[2] == 38:
# transform from [height, width, 2*n_pairs] to [2*n_pairs, height, width]
pafMat = np.rollaxis(pafMat, 2, 0)
# reliability issue.
heatMat = heatMat - heatMat.min(axis=1).min(axis=1).reshape(19, 1, 1)
heatMat = heatMat - heatMat.min(axis=2).reshape(19, heatMat.shape[1], 1)
_NMS_Threshold = max(np.average(heatMat) * 4.0, NMS_Threshold)
_NMS_Threshold = min(_NMS_Threshold, 0.3)
coords = [] # for each part index, it stores coordinates of candidates
for heatmap in heatMat[:-1]: # remove background
part_candidates = non_max_suppression(heatmap, 5, _NMS_Threshold)
coords.append(np.where(part_candidates >= _NMS_Threshold))
connection_all = [] # all connections detected. no information about what humans they belong to
for (idx1, idx2), (paf_x_idx, paf_y_idx) in zip(CocoPairs, CocoPairsNetwork):
connection = estimate_pose_pair(coords, idx1, idx2, pafMat[paf_x_idx], pafMat[paf_y_idx])
connection_all.extend(connection)
conns_by_human = dict()
for idx, c in enumerate(connection_all):
conns_by_human['human_%d' % idx] = [c] # at first, all connections belong to different humans
no_merge_cache = defaultdict(list)
empty_set = set()
while True:
is_merged = False
for h1, h2 in itertools.combinations(conns_by_human.keys(), 2):
if h1 == h2:
continue
if h2 in no_merge_cache[h1]:
continue
for c1, c2 in itertools.product(conns_by_human[h1], conns_by_human[h2]):
# if two humans share a part (same part idx and coordinates), merge those humans
if set(c1['uPartIdx']) & set(c2['uPartIdx']) != empty_set:
is_merged = True
# extend human1 connectios with human2 connections
conns_by_human[h1].extend(conns_by_human[h2])
conns_by_human.pop(h2) # delete human2
break
if is_merged:
no_merge_cache.pop(h1, None)
break
else:
no_merge_cache[h1].append(h2)
if not is_merged: # if no more mergings are possible, then break
break
# reject by subset count
conns_by_human = {h: conns for (h, conns) in conns_by_human.items() if len(conns) >= Min_Subset_Cnt}
# reject by subset max score
conns_by_human = {h: conns for (h, conns) in conns_by_human.items() if max([conn['score'] for conn in conns]) >= Min_Subset_Score}
# list of humans
humans = [human_conns_to_human_parts(human_conns, heatMat) for human_conns in conns_by_human.values()]
return humans
def estimate_pose_pair(coords, partIdx1, partIdx2, pafMatX, pafMatY):
connection_temp = [] # all possible connections
peak_coord1, peak_coord2 = coords[partIdx1], coords[partIdx2]
for idx1, (y1, x1) in enumerate(zip(peak_coord1[0], peak_coord1[1])):
for idx2, (y2, x2) in enumerate(zip(peak_coord2[0], peak_coord2[1])):
score, count = get_score(x1, y1, x2, y2, pafMatX, pafMatY)
if (partIdx1, partIdx2) in [(2, 3), (3, 4), (5, 6), (6, 7)]: # arms
if count < InterMinAbove_Threshold // 2 or score <= 0.0:
continue
elif count < InterMinAbove_Threshold or score <= 0.0:
continue
connection_temp.append({
'score': score,
'coord_p1': (x1, y1),
'coord_p2': (x2, y2),
'idx': (idx1, idx2), # connection candidate identifier
'partIdx': (partIdx1, partIdx2),
'uPartIdx': ('{}-{}-{}'.format(x1, y1, partIdx1), '{}-{}-{}'.format(x2, y2, partIdx2))
})
connection = []
used_idx1, used_idx2 = [], []
# sort possible connections by score, from maximum to minimum
for conn_candidate in sorted(connection_temp, key=lambda x: x['score'], reverse=True):
# check not connected
if conn_candidate['idx'][0] in used_idx1 or conn_candidate['idx'][1] in used_idx2:
continue
connection.append(conn_candidate)
used_idx1.append(conn_candidate['idx'][0])
used_idx2.append(conn_candidate['idx'][1])
return connection
def get_score(x1, y1, x2, y2, pafMatX, pafMatY):
num_inter = 10
dx, dy = x2 - x1, y2 - y1
normVec = math.sqrt(dx ** 2 + dy ** 2)
if normVec < 1e-4:
return 0.0, 0
vx, vy = dx / normVec, dy / normVec
xs = np.arange(x1, x2, dx / num_inter) if x1 != x2 else np.full((num_inter, ), x1)
ys = np.arange(y1, y2, dy / num_inter) if y1 != y2 else np.full((num_inter, ), y1)
xs = (xs + 0.5).astype(np.int8)
ys = (ys + 0.5).astype(np.int8)
# without vectorization
pafXs = np.zeros(num_inter)
pafYs = np.zeros(num_inter)
for idx, (mx, my) in enumerate(zip(xs, ys)):
pafXs[idx] = pafMatX[my][mx]
pafYs[idx] = pafMatY[my][mx]
# vectorization slow?
# pafXs = pafMatX[ys, xs]
# pafYs = pafMatY[ys, xs]
local_scores = pafXs * vx + pafYs * vy
thidxs = local_scores > Inter_Threashold
return sum(local_scores * thidxs), sum(thidxs)
def read_imgfile(path, width, height):
img = cv2.imread(path)
val_img = preprocess(img, width, height)
return val_img
def preprocess(img, width, height):
val_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # cv2 reads in BGR format
val_img = cv2.resize(val_img, (width, height)) # each net accept only a certain size
val_img = val_img.reshape([1, height, width, 3])
val_img = val_img.astype(float)
val_img = val_img * (2.0 / 255.0) - 1.0 # image range from -1 to +1
return val_img
def draw_humans(img, human_list):
img_copied = np.copy(img)
image_h, image_w = img_copied.shape[:2]
centers = {}
for human in human_list:
part_idxs = human.keys()
# draw point
for i in range(CocoPart.Background.value):
if i not in part_idxs:
continue
part_coord = human[i][1]
center = (int(part_coord[0] * image_w + 0.5), int(part_coord[1] * image_h + 0.5))
centers[i] = center
cv2.circle(img_copied, center, 3, CocoColors[i], thickness=3, lineType=8, shift=0)
# draw line
for pair_order, pair in enumerate(CocoPairsRender):
if pair[0] not in part_idxs or pair[1] not in part_idxs:
continue
img_copied = cv2.line(img_copied, centers[pair[0]], centers[pair[1]], CocoColors[pair_order], 3)
return img_copied
'''
All code is highly based on Ildoo Kim's code (https://github.com/ildoonet/tf-openpose)
and derived from the OpenPose Library (https://github.com/CMU-Perceptual-Computing-Lab/openpose/blob/master/LICENSE)
'''
import tensorflow as tf
import cv2
import numpy as np
import argparse
from common import estimate_pose, draw_humans, read_imgfile
import time
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Tensorflow Openpose Inference')
parser.add_argument('--imgpath', type=str, default='./images/wywh.jpg')
parser.add_argument('--input-width', type=int, default=656)
parser.add_argument('--input-height', type=int, default=368)
args = parser.parse_args()
t0 = time.time()
tf.reset_default_graph()
from tensorflow.core.framework import graph_pb2
graph_def = graph_pb2.GraphDef()
# Download model from https://www.dropbox.com/s/2dw1oz9l9hi9avg/optimized_openpose.pb
with open('models/optimized_openpose.pb', 'rb') as f:
graph_def.ParseFromString(f.read())
tf.import_graph_def(graph_def, name='')
t1 = time.time()
print(t1 - t0)
inputs = tf.get_default_graph().get_tensor_by_name('inputs:0')
heatmaps_tensor = tf.get_default_graph().get_tensor_by_name('Mconv7_stage6_L2/BiasAdd:0')
pafs_tensor = tf.get_default_graph().get_tensor_by_name('Mconv7_stage6_L1/BiasAdd:0')
t2 = time.time()
print(t2 - t1)
image = read_imgfile(args.imgpath, args.input_width, args.input_height)
t3 = time.time()
print(t3 - t2)
with tf.Session() as sess:
heatMat, pafMat = sess.run([heatmaps_tensor, pafs_tensor], feed_dict={
inputs: image
})
t4 = time.time()
print(t4 - t3)
heatMat, pafMat = heatMat[0], pafMat[0]
humans = estimate_pose(heatMat, pafMat)
# display
image = cv2.imread(args.imgpath)
image_h, image_w = image.shape[:2]
image = draw_humans(image, humans)
scale = 480.0 / image_h
newh, neww = 480, int(scale * image_w + 0.5)
image = cv2.resize(image, (neww, newh), interpolation=cv2.INTER_AREA)
cv2.imshow('result', image)
t5 = time.time()
print(t5 - t4)
cv2.waitKey(0)
import math
import numpy as np
# building the vectors
dx, dy = x2 — x1, y2 — y1
normVec = math.sqrt(dx ** 2 + dy ** 2)
vx, vy = dx/normVec, dy/normVec
# sampling
num_samples = 10
xs = np.arange(x1, x2, dx/num_samples).astype(np.int8)
ys = np.arange(y1, y2, dy/num_samples).astype(np.int8)
# evaluating on the field
pafXs = pafX[ys, xs]
pafYs = pafY[ys, xs]
# integral
score = sum(pafXs * vx + pafYs * vy) / num_samples
from collections import defaultdict
import itertools
no_merge_cache = defaultdict(list)
empty_set = set()
while True:
is_merged = False
for h1, h2 in itertools.combinations(connections_by_human.keys(), 2):
for c1, c2 in itertools.product(connections_by_human[h1], connections_by_human[h2]):
# if two humans share a part (same part idx and coordinates), merge those humans
if set(c1['partCoordsAndIdx']) & set(c2['partCoordsAndIdx']) != empty_set:
is_merged = True
# extend human1 connections with human2 connections
connections_by_human[h1].extend(connections_by_human[h2])
connections_by_human.pop(h2) # delete human2
break
if not is_merged: # if no more mergings are possible, then break
break
# describe humans as a set of parts, not as a set of connections
humans = [human_conns_to_human_parts(human_conns) for human_conns in connections_by_human.values()]
import tensorflow as tf
# get tensors
inputs = tf.get_default_graph().get_tensor_by_name('inputs:0')
heatmaps_tensor = tf.get_default_graph().get_tensor_by_name('Mconv7_stage6_L2/BiasAdd:0')
pafs_tensor = tf.get_default_graph().get_tensor_by_name('Mconv7_stage6_L1/BiasAdd:0')
# forward pass
with tf.Session() as sess:
heatmaps, pafs = sess.run([heatmaps_tensor, pafs_tensor], feed_dict={
inputs: image
})
from scipy.ndimage.filters import maximum filter
part_candidates = heatmap*(heatmap == maximum_filter(heatmap, footprint=np.ones((window_size, window_size))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment