Skip to content

Instantly share code, notes, and snippets.

@RedBlaze42
Created August 7, 2021 21:55
Show Gist options
  • Save RedBlaze42/1ef971d1b39ef98f3a4f97d67967155e to your computer and use it in GitHub Desktop.
Save RedBlaze42/1ef971d1b39ef98f3a4f97d67967155e to your computer and use it in GitHub Desktop.
hls_motion
import cv2, imutils
from imutils.video import VideoStream
from os import path
import os
from shutil import copy
import time, tqdm
import pickle
def digit(input,nb):
output=str(input)
while(len(output)<nb):
output="0"+output
return output
class HLSMotion():
def __init__(self, video_path, treshold=400, concatenate_treshold=1, debug=False):
self.video_path = video_path
self.treshold = treshold
self.concatenate_treshold = concatenate_treshold
self.dir_path = path.dirname(self.video_path)
self.debug = debug
self.results_data=None
with open(video_path,"r") as hls_file:
data=hls_file.read().split("\n")
self.segments = [path.join(self.dir_path,line) for line in data if not (line.startswith("#EXT") or line=="")]
@property
def results(self):
if self.results_data is not None: return self.results_data
positive_ids=list()
self.oldFrame=self.preprocess_frame(self.get_first_frame(self.segments[0]))
for i,segment in enumerate(tqdm.tqdm(self.segments[:-1])):
if self.detect_motion(self.get_first_frame(self.segments[i+1])):
positive_ids.append(i)
print("\n")
self.results_data=positive_ids
return positive_ids
def get_motion_clips_path(self):
return [self.segments[i] for i in self.results]
def get_first_frame(self,segment):
capture=cv2.VideoCapture(segment)
_,frame=capture.read()
return frame
def concatenate(self):
clips=list()
for clip in self.results:
if len(clips)>=1 and clip-clips[-1][-1]<=self.concatenate_treshold:
while clips[-1][-1]!=clip:
clips[-1].append(clips[-1][-1]+1)
else: clips.append([clip])
return clips
def concatenate_not_motion(self):
motion_clips_after_concat=list()
for clip in self.concatenate():
motion_clips_after_concat+=clip
not_motion=[clip_id for clip_id,clip_path in enumerate(self.segments) if clip_id not in motion_clips_after_concat]
clips=list()
for clip_id in not_motion:
if len(clips)>=1 and clips[-1][-1]==clip_id-1:
clips[-1].append(clip_id)
else: clips.append([clip_id])
return clips
def clips_to_paths(self, clips):
if type(clips[0])==tuple:
paths=list()
for clip,value in clips:
paths+=([([self.segments[segment] for segment in clip],value)])
else:
paths=list()
for clip in clips:
paths+=([[self.segments[segment] for segment in clip]])
return paths
def merge_clips(self):
motion_clips=self.concatenate()
no_motion_clips=self.concatenate_not_motion()
clips=list()
while ( len(motion_clips)+len(no_motion_clips) )>0:
if len(motion_clips)>0 and motion_clips[0][0]<no_motion_clips[0][0]:
clips.append( (motion_clips[0],True) )
motion_clips.pop(0)
else:
clips.append( (no_motion_clips[0],False) )
no_motion_clips.pop(0)
return clips
def preprocess_frame(self,newFrame):
newFrame = imutils.resize(newFrame, width=500)
gray = cv2.cvtColor(newFrame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
return gray
def detect_motion(self,newFrame):
#Preprocess
gray=self.preprocess_frame(newFrame)
#Diff
frameDelta = cv2.absdiff(self.oldFrame, gray)
thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
#Postprocess
thresh = cv2.dilate(thresh, None, iterations=2)
contours = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = contours[0]
if self.debug: old_frame=self.oldFrame
#Cleaning
self.oldFrame=gray
if len(contours)==0:
if self.debug: self.test(old_frame,gray,thresh,False)#DEBUG
return False
else:
if self.debug: self.test(old_frame,gray,thresh,max([cv2.contourArea(c) for c in contours])>self.treshold, motion_value = max([cv2.contourArea(c) for c in contours]))#DEBUG
return max([cv2.contourArea(c) for c in contours])>self.treshold
def test(self,frame1,frame2,frame3,value, motion_value = 0,name = None):#DEBUG
if name is None:
name = str(time.time())
if value:
path = "debug_1//"
else:
path = "debug_0//"
motion_value = int(motion_value)
os.makedirs("debug_1", exist_ok=True)
os.makedirs("debug_0", exist_ok=True)
cv2.imwrite(f"{path}{name}_{motion_value}_0.jpg",frame1)
cv2.imwrite(f"{path}{name}_{motion_value}_1.jpg",frame2)
cv2.imwrite(f"{path}{name}_{motion_value}_2.jpg",frame3)
def export(self, clips, export_path):
with open("concat.txt","w") as file:
file.write("\n".join(["file '{}'".format(clip) for clip in clips]))
if os.system("ffmpeg -y -f concat -safe 0 -i concat.txt -c copy {}".format(export_path)) !=0: raise OSError
os.remove("concat.txt")
def export_speed(self, clips, export_path, speed):
with open("concat.txt","w") as file:
file.write("\n".join(["file '{}'".format(clip) for clip in clips]))
if os.system("ffmpeg -y -f concat -safe 0 -i concat.txt -c copy temp.mp4"): raise OSError
if os.system("ffmpeg -y -hwaccel cuvid -c:v h264_cuvid -i temp.mp4 -r 30 -filter:v \"setpts={}*PTS\" -c:v h264_nvenc -preset slow -an -b:v 4M {}".format(round(1/speed,8),export_path)) !=0: raise OSError
os.remove("concat.txt")
os.remove("temp.mp4")
def save_results(self, export_path):
with open(export_path, "wb") as file:
pickle.dump(self.results, file)
def load_results(self, load_path):
with open(load_path, "rb") as file:
self.results_data = pickle.load(file)
return self.results
if __name__ == "__main__":
motion=HLSMotion("hls/video.m3u8",treshold=1000, concatenate_treshold=1)
for i,clip in enumerate(motion.concatenate()):
motion.export(clip,"output\\{}.mp4".format(digit(i+1,5)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment