Skip to content

Instantly share code, notes, and snippets.

@guoxiaolu
Last active Jul 28, 2020
Embed
What would you like to do?
高空抛物 drop judge
import os
import cv2
import numpy as np
from skimage.feature import blob_dog, blob_log, blob_doh
from math import sqrt
from time import time
from multiprocessing import Pool, Queue
def GetDiff(img0,img1,thresh_diff_pix):
img_diff=img0-img1
img_abs=np.fabs(img_diff)
img_abs_3c=np.sum(img_abs,axis=2)/3/255
lower_th=thresh_diff_pix
img_range=cv2.inRange(img_abs_3c,thresh_diff_pix,1)
return img_range
def find_lines(params):
fid, fblobs, seeds, ishape = params
cnt = 0
for seed in seeds:
for i in range(fid - 2, 0, -1):
if i not in fblobs or fid - 2 - i > 10:
break
ifblobs = fblobs[i]
imark = [0] * len(ifblobs)
for j, ifblob in enumerate(ifblobs):
cnt += 1
if seed[-1][1] <= int(ifblob[0]) or abs(seed[-1][0] - int(ifblob[1])) >= ishape[1] // 10 or \
(seed[-1][1] - int(ifblob[0])) >= ishape[0] // 10:
continue
if imark[j] != 0:
continue
dv = [seed[-2][0] - seed[-1][0], seed[-2][1] - seed[-1][1]]
dv_norm = np.linalg.norm(np.array(dv))
idv = [seed[-1][0] - int(ifblob[1]), seed[-1][1] - int(ifblob[0])]
idv_norm = np.linalg.norm(np.array(idv))
cosangle = np.array(dv).dot(np.array(idv)) / (dv_norm * idv_norm)
angle = np.arccos(cosangle)
# 0.26 is about 15 degree
if angle < 0.26:
seed.append((int(ifblob[1]), int(ifblob[0])))
imark[j] = 1
break
print (len(seeds), cnt)
return seeds
class DropDetect:
def __init__(self):
self.back_sum=None
self.back_avg=None
self.last_img=[]
self.img_counter=0
self.thresh_diff_pix=0.1
self.list_mask_area=[]
self.list_mask_area.append(object)
self.fblobs = {}
def Pre(self,img):
self.FeedBackGround(img)
def FeedBackGround(self,img):
if self.back_sum==None:
self.back_sum=img.astype(np.float32)
self.back_sum+=img.astype(np.float32)
self.img_counter+=1
self.back_avg=self.back_sum/self.img_counter
def SubBack(self,img):
h,w,u=img.shape
if self.img_counter<100:
return np.zeros()
def GetBackGound(self,source,num_frames):
vc=cv2.VideoCapture(source)
res,img=vc.read()
# temp
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4))
img_sum=img.astype(np.float32)
counter=1
while True:
res,img=vc.read()
if res==False:
break
# temp
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4))
img_sum+=img
counter+=1
if num_frames<0:
continue
elif counter>num_frames:
break
back_avg=img_sum/counter
return back_avg
def GetDiff(self,img, fid):
img_float=img.astype(np.float32)
diff_avg=GetDiff(img_float, self.back_avg, self.thresh_diff_pix)
if self.last_img!=[]:
diff_last=GetDiff(img_float, self.last_img, self.thresh_diff_pix)
self.last_img=img_float
diff = diff_avg
mask = np.zeros(diff.shape, np.uint8)
mask[diff.shape[0] // 5:diff.shape[0] // 5 * 4, :] = diff[diff.shape[0] // 5:diff.shape[0] // 5 * 4, :]
_, labels, stats, centroids = cv2.connectedComponentsWithStats(mask)
# blobs_doh = blob_doh(diff, max_sigma=30, threshold=.01)
blobs = []
for i, blob in enumerate(stats):
if blob[-1] <= 10 or blob[-1] > img.shape[0] * img.shape[1] / 10:
continue
cpt = centroids[i].tolist()
r = sqrt((cpt[0] - blob[0]) * (cpt[0] - blob[0]) + (cpt[1] - blob[1]) * (cpt[1] - blob[1]))
cpt.append(r)
blobs.append([cpt[1], cpt[0], cpt[2]])
self.fblobs[fid] = blobs
# for blob in blobs_doh:
# y, x, r = blob
# if r <= 5:
# continue
# blobs.append(blob.tolist())
# self.fblobs[fid] = blobs
is_drop = False
start = time()
if len(blobs) != 0 and fid - 1 in self.fblobs:
seeds = []
for blob in blobs:
for lblob in self.fblobs[fid - 1]:
seed = [(int(blob[1]), int(blob[0])), (int(lblob[1]), int(lblob[0]))]
if seed[0][1] > seed[1][1] and abs(seed[0][0] - seed[1][0]) < img.shape[1] // 10 and \
(seed[0][1] - seed[1][1]) < img.shape[0] // 10:
seeds.append(seed)
params = (fid, self.fblobs, seeds, img.shape)
find_lines(params)
# if len(seeds) != 0:
# params = []
# for i in range(0, len(seeds), 10):
# nseeds = seeds[i:i+10]
# params.append((fid, self.fblobs, nseeds, img.shape))
# pool = Pool(len(params))
# ret_list = pool.map(find_lines, params)
# pool.close()
# pool.join()
# seeds = [ro for ret in ret_list for ro in ret]
for seed in seeds:
if len(seed) <= 10:
continue
# to be add
is_drop = True
drop_seed = seed
return diff, blobs, is_drop, drop_seed
end = time()
print (end - start)
return diff, blobs, is_drop, []
def testdrop():
fn_v="/Users/guoxiaolu/work/Drop/2020_07_03_13_56_IMG_2005.avi"
dd=DropDetect()
image_back=dd.GetBackGound(fn_v,500)
vc=cv2.VideoCapture(fn_v)
dd.back_avg=image_back
counter=0
fid = 0
basename = os.path.basename(fn_v)
extnames = os.path.splitext(basename)
fourcc = cv2.VideoWriter_fourcc(*'mpeg')
vname = os.path.join('./', extnames[0] + '_predict.mp4')
out = cv2.VideoWriter(vname, fourcc, 25, (image_back.shape[1], image_back.shape[0]))
start = time()
while True:
res,img=vc.read()
if res==False:
break
# temp
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4))
fid += 1
if fid > 100:
break
diff, blobs, is_drop, drop_seed = dd.GetDiff(img, fid)
vis = cv2.cvtColor(diff, cv2.COLOR_GRAY2BGR)
for blob in blobs:
y, x, r = blob
cv2.circle(vis, (int(x), int(y)), int(r), (255, 0, 0), 2)
if is_drop:
cv2.putText(vis,'Drop!', (50, 50),cv2.FONT_HERSHEY_COMPLEX,1,(0, 0, 255), 5)
for i, seed in enumerate(drop_seed):
if i >= len(drop_seed) - 2:
continue
cv2.line(vis, seed, drop_seed[i + 1], (0, 0, 255), 2)
print (fid)
cv2.imshow("img",img)
cv2.imshow("diff",vis)
cv2.waitKey(1)
out.write(vis)
print (time() - start)
out.release()
if __name__=="__main__":
testdrop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment