Skip to content

Instantly share code, notes, and snippets.

@ivanstepanovftw
Created June 29, 2023 14:36
Show Gist options
  • Save ivanstepanovftw/ea19cd8752e6f60280640d08e0ff17be to your computer and use it in GitHub Desktop.
Save ivanstepanovftw/ea19cd8752e6f60280640d08e0ff17be to your computer and use it in GitHub Desktop.
Python implementation of Computing Inverse Optical Flow (2013, Javier Sánchez et al)
def flow_disocclusion(flow: np.ndarray) -> (np.ndarray):
h, w, _ = flow.shape
remap_flow = flow.transpose(2, 0, 1)
remap_xy = np.float32(np.mgrid[:h, :w][::-1])
uv_new = (remap_xy + remap_flow).round().astype(np.int32)
mask = (uv_new[0] >= 0) & (uv_new[1] >= 0) & (uv_new[0] < w) & (uv_new[1] < h)
uv_new_ = uv_new[:, mask]
mask_remaped = np.zeros((h, w), np.int8)
mask_remaped[uv_new_[1], uv_new_[0]] = 255
return mask_remaped
# Source: https://ctim.ulpgc.es/research_works/computing_inverse_optical_flow/
OCCLUSION = 99999.0
OCCLUSION_MAX_FLOW = 0
WEIGHT_TH = 0.25
MOTION_TH = 0.25
MIN_FILL = 1
AVERAGE_FILL = 2
ORIENTED_FILL = 3
NO_DISOCCLUSION = 1
DISOCCLUSION = 0
STEREO_DISOCCLUSION = -1
STREETLAMP_DISOCCLUSION = -2
def inverse_flow012(flow: np.ndarray) -> (np.ndarray, np.ndarray):
h, w = flow.shape[0], flow.shape[1]
u_, v_ = np.zeros((h, w)), np.zeros((h, w))
mask = np.zeros((h, w))
u, v = flow[:, :, 0], flow[:, :, 1]
for y in range(h):
for x in range(w):
# warping the flow
xw = x + u[y, x]
yw = y + v[y, x]
# sign of the warped position
sx = -1 if xw < 0 else 1
sy = -1 if yw < 0 else 1
# integer part of the warped position
xi = int(xw)
yi = int(yw)
# warped position
dx = xi + sx
dy = yi + sy
# check that the warped position is inside the image
xi = max(0, min(xi, w - 1))
yi = max(0, min(yi, h - 1))
dx = max(0, min(dx, w - 1))
dy = max(0, min(dy, h - 1))
# compute the four proportions
e1 = sx * (xw - xi)
E1 = 1.0 - e1
e2 = sy * (yw - yi)
E2 = 1.0 - e2
# compute the four weights
w1 = E1 * E2
w2 = e1 * E2
w3 = E1 * e2
w4 = e1 * e2
# compute the four distances
d = u[y, x] ** 2 + v[y, x] ** 2
d1 = u_[yi, xi] ** 2 + v_[yi, xi] ** 2
d2 = u_[yi, dx] ** 2 + v_[yi, dx] ** 2
d3 = u_[dy, xi] ** 2 + v_[dy, xi] ** 2
d4 = u_[dy, dx] ** 2 + v_[dy, dx] ** 2
# check if the warped position is occluded
if w1 >= WEIGHT_TH and d >= d1:
u_[yi, xi] = -u[y, x]
v_[yi, xi] = -v[y, x]
mask[yi, xi] = NO_DISOCCLUSION
if w2 >= WEIGHT_TH and d >= d2:
u_[yi, dx] = -u[y, x]
v_[yi, dx] = -v[y, x]
mask[yi, dx] = NO_DISOCCLUSION
if w3 >= WEIGHT_TH and d >= d3:
u_[dy, xi] = -u[y, x]
v_[dy, xi] = -v[y, x]
mask[dy, xi] = NO_DISOCCLUSION
if w4 >= WEIGHT_TH and d >= d4:
u_[dy, dx] = -u[y, x]
v_[dy, dx] = -v[y, x]
mask[dy, dx] = NO_DISOCCLUSION
return np.dstack((u_, v_)), mask
def inverse_flow(flow: np.ndarray) -> (np.ndarray, np.ndarray):
h, w = flow.shape[1], flow.shape[2]
u_, v_ = np.zeros((h, w)), np.zeros((h, w))
mask = np.zeros((h, w))
u, v = flow[0, :, :], flow[1, :, :]
for y in range(h):
for x in range(w):
# warping the flow
xw = x + u[y, x]
yw = y + v[y, x]
# sign of the warped position
sx = -1 if xw < 0 else 1
sy = -1 if yw < 0 else 1
# integer part of the warped position
xi = int(xw)
yi = int(yw)
# warped position
dx = xi + sx
dy = yi + sy
# check that the warped position is inside the image
xi = max(0, min(xi, w - 1))
yi = max(0, min(yi, h - 1))
dx = max(0, min(dx, w - 1))
dy = max(0, min(dy, h - 1))
# compute the four proportions
e1 = sx * (xw - xi)
E1 = 1.0 - e1
e2 = sy * (yw - yi)
E2 = 1.0 - e2
# compute the four weights
w1 = E1 * E2
w2 = e1 * E2
w3 = E1 * e2
w4 = e1 * e2
# compute the four distances
d = u[y, x] ** 2 + v[y, x] ** 2
d1 = u_[yi, xi] ** 2 + v_[yi, xi] ** 2
d2 = u_[yi, dx] ** 2 + v_[yi, dx] ** 2
d3 = u_[dy, xi] ** 2 + v_[dy, xi] ** 2
d4 = u_[dy, dx] ** 2 + v_[dy, dx] ** 2
# check if the warped position is occluded
if w1 >= WEIGHT_TH and d >= d1:
u_[yi, xi] = -u[y, x]
v_[yi, xi] = -v[y, x]
mask[yi, xi] = NO_DISOCCLUSION
if w2 >= WEIGHT_TH and d >= d2:
u_[yi, dx] = -u[y, x]
v_[yi, dx] = -v[y, x]
mask[yi, dx] = NO_DISOCCLUSION
if w3 >= WEIGHT_TH and d >= d3:
u_[dy, xi] = -u[y, x]
v_[dy, xi] = -v[y, x]
mask[dy, xi] = NO_DISOCCLUSION
if w4 >= WEIGHT_TH and d >= d4:
u_[dy, dx] = -u[y, x]
v_[dy, dx] = -v[y, x]
mask[dy, dx] = NO_DISOCCLUSION
return np.stack((u_, v_)), mask
def select_motion(d, u, v, wgt,
d_, u_, v_, wgt_, mask, pos):
if wgt >= WEIGHT_TH:
if abs(d - d_[pos]) <= MOTION_TH:
u_[pos] += u * wgt
v_[pos] += v * wgt
wgt_[pos] += wgt
mask[pos] = NO_DISOCCLUSION
elif d >= d_[pos]:
d_[pos] = d
u_[pos] = u * wgt
v_[pos] = v * wgt
wgt_[pos] = wgt
mask[pos] = NO_DISOCCLUSION
def inverse_average_flow(flow):
h, w = flow.shape[0], flow.shape[1]
u_, v_ = np.zeros((h, w)), np.zeros((h, w))
mask = np.zeros((h, w))
avg_u = np.zeros((h, w))
avg_v = np.zeros((h, w))
wgt_ = np.zeros((h, w))
d_ = np.full((h, w), -999)
u, v = flow[:, :, 0], flow[:, :, 1]
for y in range(h):
for x in range(w):
xw = x + u[y, x]
yw = y + v[y, x]
sx = -1 if xw < 0 else 1
sy = -1 if yw < 0 else 1
xi = int(xw)
yi = int(yw)
dx = xi + sx
dy = yi + sy
xi = max(0, min(xi, w - 1))
yi = max(0, min(yi, h - 1))
dx = max(0, min(dx, w - 1))
dy = max(0, min(dy, h - 1))
e1 = sx * (xw - xi)
E1 = 1.0 - e1
e2 = sy * (yw - yi)
E2 = 1.0 - e2
w1 = E1 * E2
w2 = e1 * E2
w3 = E1 * e2
w4 = e1 * e2
d = u[y, x] ** 2 + v[y, x] ** 2
select_motion(d, u[y, x], v[y, x], w1, d_, avg_u, avg_v, wgt_, mask, (yi, xi))
select_motion(d, u[y, x], v[y, x], w2, d_, avg_u, avg_v, wgt_, mask, (yi, dx))
select_motion(d, u[y, x], v[y, x], w3, d_, avg_u, avg_v, wgt_, mask, (dy, xi))
select_motion(d, u[y, x], v[y, x], w4, d_, avg_u, avg_v, wgt_, mask, (dy, dx))
for y in range(h):
for x in range(w):
if wgt_[y, x] > 0:
u_[y, x] = -avg_u[y, x] / wgt_[y, x]
v_[y, x] = -avg_v[y, x] / wgt_[y, x]
return np.dstack((u_, v_)), mask
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment