Created
August 7, 2021 21:55
-
-
Save RedBlaze42/1ef971d1b39ef98f3a4f97d67967155e to your computer and use it in GitHub Desktop.
hls_motion
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2, imutils | |
from imutils.video import VideoStream | |
from os import path | |
import os | |
from shutil import copy | |
import time, tqdm | |
import pickle | |
def digit(input,nb): | |
output=str(input) | |
while(len(output)<nb): | |
output="0"+output | |
return output | |
class HLSMotion(): | |
def __init__(self, video_path, treshold=400, concatenate_treshold=1, debug=False): | |
self.video_path = video_path | |
self.treshold = treshold | |
self.concatenate_treshold = concatenate_treshold | |
self.dir_path = path.dirname(self.video_path) | |
self.debug = debug | |
self.results_data=None | |
with open(video_path,"r") as hls_file: | |
data=hls_file.read().split("\n") | |
self.segments = [path.join(self.dir_path,line) for line in data if not (line.startswith("#EXT") or line=="")] | |
@property | |
def results(self): | |
if self.results_data is not None: return self.results_data | |
positive_ids=list() | |
self.oldFrame=self.preprocess_frame(self.get_first_frame(self.segments[0])) | |
for i,segment in enumerate(tqdm.tqdm(self.segments[:-1])): | |
if self.detect_motion(self.get_first_frame(self.segments[i+1])): | |
positive_ids.append(i) | |
print("\n") | |
self.results_data=positive_ids | |
return positive_ids | |
def get_motion_clips_path(self): | |
return [self.segments[i] for i in self.results] | |
def get_first_frame(self,segment): | |
capture=cv2.VideoCapture(segment) | |
_,frame=capture.read() | |
return frame | |
def concatenate(self): | |
clips=list() | |
for clip in self.results: | |
if len(clips)>=1 and clip-clips[-1][-1]<=self.concatenate_treshold: | |
while clips[-1][-1]!=clip: | |
clips[-1].append(clips[-1][-1]+1) | |
else: clips.append([clip]) | |
return clips | |
def concatenate_not_motion(self): | |
motion_clips_after_concat=list() | |
for clip in self.concatenate(): | |
motion_clips_after_concat+=clip | |
not_motion=[clip_id for clip_id,clip_path in enumerate(self.segments) if clip_id not in motion_clips_after_concat] | |
clips=list() | |
for clip_id in not_motion: | |
if len(clips)>=1 and clips[-1][-1]==clip_id-1: | |
clips[-1].append(clip_id) | |
else: clips.append([clip_id]) | |
return clips | |
def clips_to_paths(self, clips): | |
if type(clips[0])==tuple: | |
paths=list() | |
for clip,value in clips: | |
paths+=([([self.segments[segment] for segment in clip],value)]) | |
else: | |
paths=list() | |
for clip in clips: | |
paths+=([[self.segments[segment] for segment in clip]]) | |
return paths | |
def merge_clips(self): | |
motion_clips=self.concatenate() | |
no_motion_clips=self.concatenate_not_motion() | |
clips=list() | |
while ( len(motion_clips)+len(no_motion_clips) )>0: | |
if len(motion_clips)>0 and motion_clips[0][0]<no_motion_clips[0][0]: | |
clips.append( (motion_clips[0],True) ) | |
motion_clips.pop(0) | |
else: | |
clips.append( (no_motion_clips[0],False) ) | |
no_motion_clips.pop(0) | |
return clips | |
def preprocess_frame(self,newFrame): | |
newFrame = imutils.resize(newFrame, width=500) | |
gray = cv2.cvtColor(newFrame, cv2.COLOR_BGR2GRAY) | |
gray = cv2.GaussianBlur(gray, (21, 21), 0) | |
return gray | |
def detect_motion(self,newFrame): | |
#Preprocess | |
gray=self.preprocess_frame(newFrame) | |
#Diff | |
frameDelta = cv2.absdiff(self.oldFrame, gray) | |
thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] | |
#Postprocess | |
thresh = cv2.dilate(thresh, None, iterations=2) | |
contours = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
contours = contours[0] | |
if self.debug: old_frame=self.oldFrame | |
#Cleaning | |
self.oldFrame=gray | |
if len(contours)==0: | |
if self.debug: self.test(old_frame,gray,thresh,False)#DEBUG | |
return False | |
else: | |
if self.debug: self.test(old_frame,gray,thresh,max([cv2.contourArea(c) for c in contours])>self.treshold, motion_value = max([cv2.contourArea(c) for c in contours]))#DEBUG | |
return max([cv2.contourArea(c) for c in contours])>self.treshold | |
def test(self,frame1,frame2,frame3,value, motion_value = 0,name = None):#DEBUG | |
if name is None: | |
name = str(time.time()) | |
if value: | |
path = "debug_1//" | |
else: | |
path = "debug_0//" | |
motion_value = int(motion_value) | |
os.makedirs("debug_1", exist_ok=True) | |
os.makedirs("debug_0", exist_ok=True) | |
cv2.imwrite(f"{path}{name}_{motion_value}_0.jpg",frame1) | |
cv2.imwrite(f"{path}{name}_{motion_value}_1.jpg",frame2) | |
cv2.imwrite(f"{path}{name}_{motion_value}_2.jpg",frame3) | |
def export(self, clips, export_path): | |
with open("concat.txt","w") as file: | |
file.write("\n".join(["file '{}'".format(clip) for clip in clips])) | |
if os.system("ffmpeg -y -f concat -safe 0 -i concat.txt -c copy {}".format(export_path)) !=0: raise OSError | |
os.remove("concat.txt") | |
def export_speed(self, clips, export_path, speed): | |
with open("concat.txt","w") as file: | |
file.write("\n".join(["file '{}'".format(clip) for clip in clips])) | |
if os.system("ffmpeg -y -f concat -safe 0 -i concat.txt -c copy temp.mp4"): raise OSError | |
if os.system("ffmpeg -y -hwaccel cuvid -c:v h264_cuvid -i temp.mp4 -r 30 -filter:v \"setpts={}*PTS\" -c:v h264_nvenc -preset slow -an -b:v 4M {}".format(round(1/speed,8),export_path)) !=0: raise OSError | |
os.remove("concat.txt") | |
os.remove("temp.mp4") | |
def save_results(self, export_path): | |
with open(export_path, "wb") as file: | |
pickle.dump(self.results, file) | |
def load_results(self, load_path): | |
with open(load_path, "rb") as file: | |
self.results_data = pickle.load(file) | |
return self.results | |
if __name__ == "__main__": | |
motion=HLSMotion("hls/video.m3u8",treshold=1000, concatenate_treshold=1) | |
for i,clip in enumerate(motion.concatenate()): | |
motion.export(clip,"output\\{}.mp4".format(digit(i+1,5))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment