A video frame generator respecting Keras.Sequence class with data augmentation capacities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# DEPRECATED - please go to my complete package: | |
# pipy package https://pypi.org/project/keras-video-generators/ | |
# github sources https://github.com/metal3d/keras-video-generators | |
import keras | |
import cv2 as cv | |
import glob | |
import numpy as np | |
import os | |
import random | |
import math | |
# author: Patrice Ferlet <patrice.ferlet@smile.fr> | |
# licence: MIT | |
class VideoFrameGenerator(keras.utils.Sequence): | |
''' | |
Video frame generator generates batch of frames from a video directory. Videos should be | |
classified in classes directories. E.g: | |
videos/class1/file1.avi | |
videos/class1/file2.avi | |
videos/class2/file3.avi | |
''' | |
def __init__(self, | |
from_dir, | |
batch_size=8, | |
target_shape=(224, 224, 3), | |
nbframe=5, | |
shuffle=True, | |
move_detection=8., | |
transform:keras.preprocessing.image.ImageDataGenerator=None, | |
val_split=.2 | |
): | |
""" | |
Create a Video Frame Generator with data augmentation. | |
Usage example: | |
gen = VideoFrameGenerator('./out/videos/', | |
batch_size=5, | |
nbframe=3, | |
transform=keras.preprocessing.image.ImageDataGenerator(rotation_range=5, horizontal_flip=True)) | |
Arguments: | |
- from_dir: path to the data directory where resides videos, | |
videos should be splitted in directories that are name as labels | |
- batch_size: number of videos to generate | |
- target_shape: image shape to generate | |
- nbframe: number of frames per video to send | |
- shuffle: boolean, shuffle data at start and after each epoch | |
- move_detection: try to eliminate frames where no moves is detected, | |
this float value is relatvie to your dataset, if set to 0.0 => no detection | |
- transform: a keras ImageGenerator configured with random transformations | |
to apply on each frame. Each video will be processed with the same | |
transformation at one time to not break consistence. | |
- val_split: fract to setup validation data - set it to None to avoid the | |
generator to split data | |
""" | |
if val_split is not None: | |
assert val_split > 0 and val_split < 1 | |
assert len(target_shape) >= 2 | |
self.from_dir = from_dir | |
self.nbframe = nbframe | |
self.batch_size = batch_size | |
self.target_shape = target_shape | |
self.shuffle = shuffle | |
self.move_detection = move_detection | |
self.transform = transform | |
self.val_split = val_split | |
# the list of classes, built in __list_all_files | |
self.classes = [] | |
self.files = [] | |
self.data = [] | |
self.validation = None | |
# prepare the list | |
self.__filecount = 0 | |
self.__frames = {} | |
if from_dir != None: | |
self.__list_all_files() | |
def __len__(self): | |
""" Length of the generator | |
Warning: it gives the number of loop to do, not the number of files or | |
frames. The result is number_of_video/batch_size. You can use it as | |
`step_per_epoch` or `validation_step` for `model.fit_generator` parameters. | |
""" | |
return self.__filecount//self.batch_size | |
#return len(self.data*self.batch_size) | |
def __getitem__(self, index): | |
""" Generator needed method - return a batch of `batch_size` video | |
block with `self.nbframe` for each | |
""" | |
indexes = self.data[index*self.batch_size:(index+1)*self.batch_size] | |
X, Y = self.__data_aug(indexes) | |
return X, Y | |
def on_epoch_end(self): | |
""" When epoch has finished, random shuffle images in memory """ | |
if self.shuffle: | |
random.shuffle(self.data) | |
def get_validation(self): | |
return self.validation | |
def __list_all_files(self): | |
""" List and inject images in memory """ | |
self.classes = glob.glob(os.path.join(self.from_dir, '*')) | |
self.classes = [os.path.basename(c) for c in self.classes] | |
self.__filecount = len(glob.glob(os.path.join(self.from_dir, '*/*'))) | |
current_count = self.__filecount # because __filecount can change | |
if self.val_split is not None: | |
valcount = int(self.__filecount*self.val_split) | |
self.__filecount=math.ceil(self.__filecount* (1.0-self.val_split)) | |
i = 1 | |
print("Inject frames in memory, could take a while...") | |
for classname in self.classes: | |
files = glob.glob(os.path.join(self.from_dir, classname, '*')) | |
for file in files: | |
print('\rProcessing file %d/%d' % (i, current_count), end='') | |
i+=1 | |
# self.__openframe(classname, file) | |
self.data.append((classname, file)) # keep file id | |
if self.shuffle: | |
random.shuffle(self.data) | |
if self.val_split is not None: | |
# initiate validation object | |
self.validation = self.__class__( | |
None, | |
batch_size=self.batch_size, | |
nbframe=self.nbframe, | |
target_shape=self.target_shape, | |
transform=None | |
) | |
self.validation.classes = self.classes | |
self.validation.__filecount = valcount | |
valcount = math.ceil(len(self.data)*self.val_split) | |
self.validation.data = self.data[:valcount] | |
self.data = self.data[valcount:] | |
def __openframes(self, classname, file): | |
""" Append ORIGNALS frames in memory, transformations are made on the fly """ | |
frames = [] | |
vid = cv.VideoCapture(file) | |
while True: | |
grabbed, frame = vid.read() | |
if not grabbed: | |
break | |
frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB) | |
frame = cv.resize(frame, self.target_shape[:2]) | |
frames.append(frame) | |
if self.move_detection > 0.: | |
# make a movement detection | |
f= frames[0] | |
f = cv.cvtColor(f, cv.COLOR_RGB2GRAY) | |
f = cv.GaussianBlur(f, (3,3), 7) | |
last = f | |
important = [] | |
for i in range(1, len(frames)): | |
f = frames[i] | |
cp = cv.cvtColor(f, cv.COLOR_RGB2GRAY) | |
cp = cv.GaussianBlur(cp, (3,3), 7) | |
delta = cv.absdiff(cp, last) | |
thresh = cv.threshold(delta, 25, 255, cv.THRESH_BINARY)[1] | |
thresh = cv.dilate(thresh, None, iterations=2) | |
if np.mean(thresh) > self.move_detection: | |
important.append(f) | |
last = cp | |
frames = important | |
step = len(frames)//self.nbframe | |
if len(frames) >= self.nbframe: | |
frames = frames[::step][-self.nbframe:] | |
# add frames in memory | |
if len(frames) == self.nbframe: | |
# self.data.append((classname, frames)) | |
self.__frames[file] = frames | |
return frames | |
else: | |
self.__filecount-=1 # remove one file from the counter | |
print('\n%s/%s has not enought frames ==> %d' % (classname, file, len(frames))) | |
return None | |
def __data_aug(self, batch): | |
""" Make random transformation based on ImageGenerator arguments""" | |
T = None | |
X, Y = [], [] | |
for y, file in batch: | |
Y.append(self.classes.index(y)) # label | |
# if self.__frames[file] is an integer, so we need to get frames | |
images = self.__frames.get(file, False) | |
if not images: | |
images = self.__openframes(y, file) # this set self.__frames[file] | |
x = [] | |
if self.transform: | |
# apply the same transformation for this frames | |
T = self.transform.get_random_transform(self.target_shape[:2]) | |
for img in images: | |
if T: | |
x.append(self.transform.apply_transform(img, T)) | |
else: | |
x.append(img) | |
X.append(x) | |
return np.array(X), keras.utils.to_categorical(Y, num_classes=len(self.classes)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment