Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
A video frame generator respecting Keras.Sequence class with data augmentation capacities
# DEPRECATED - please go to my complete package:
# pipy package
# github sources
import keras
import cv2 as cv
import glob
import numpy as np
import os
import random
import math
# author: Patrice Ferlet <>
# licence: MIT
class VideoFrameGenerator(keras.utils.Sequence):
Video frame generator generates batch of frames from a video directory. Videos should be
classified in classes directories. E.g:
def __init__(self,
target_shape=(224, 224, 3),
Create a Video Frame Generator with data augmentation.
Usage example:
gen = VideoFrameGenerator('./out/videos/',
transform=keras.preprocessing.image.ImageDataGenerator(rotation_range=5, horizontal_flip=True))
- from_dir: path to the data directory where resides videos,
videos should be splitted in directories that are name as labels
- batch_size: number of videos to generate
- target_shape: image shape to generate
- nbframe: number of frames per video to send
- shuffle: boolean, shuffle data at start and after each epoch
- move_detection: try to eliminate frames where no moves is detected,
this float value is relatvie to your dataset, if set to 0.0 => no detection
- transform: a keras ImageGenerator configured with random transformations
to apply on each frame. Each video will be processed with the same
transformation at one time to not break consistence.
- val_split: fract to setup validation data - set it to None to avoid the
generator to split data
if val_split is not None:
assert val_split > 0 and val_split < 1
assert len(target_shape) >= 2
self.from_dir = from_dir
self.nbframe = nbframe
self.batch_size = batch_size
self.target_shape = target_shape
self.shuffle = shuffle
self.move_detection = move_detection
self.transform = transform
self.val_split = val_split
# the list of classes, built in __list_all_files
self.classes = []
self.files = [] = []
self.validation = None
# prepare the list
self.__filecount = 0
self.__frames = {}
if from_dir != None:
def __len__(self):
""" Length of the generator
Warning: it gives the number of loop to do, not the number of files or
frames. The result is number_of_video/batch_size. You can use it as
`step_per_epoch` or `validation_step` for `model.fit_generator` parameters.
return self.__filecount//self.batch_size
#return len(*self.batch_size)
def __getitem__(self, index):
""" Generator needed method - return a batch of `batch_size` video
block with `self.nbframe` for each
indexes =[index*self.batch_size:(index+1)*self.batch_size]
X, Y = self.__data_aug(indexes)
return X, Y
def on_epoch_end(self):
""" When epoch has finished, random shuffle images in memory """
if self.shuffle:
def get_validation(self):
return self.validation
def __list_all_files(self):
""" List and inject images in memory """
self.classes = glob.glob(os.path.join(self.from_dir, '*'))
self.classes = [os.path.basename(c) for c in self.classes]
self.__filecount = len(glob.glob(os.path.join(self.from_dir, '*/*')))
current_count = self.__filecount # because __filecount can change
if self.val_split is not None:
valcount = int(self.__filecount*self.val_split)
self.__filecount=math.ceil(self.__filecount* (1.0-self.val_split))
i = 1
print("Inject frames in memory, could take a while...")
for classname in self.classes:
files = glob.glob(os.path.join(self.from_dir, classname, '*'))
for file in files:
print('\rProcessing file %d/%d' % (i, current_count), end='')
# self.__openframe(classname, file), file)) # keep file id
if self.shuffle:
if self.val_split is not None:
# initiate validation object
self.validation = self.__class__(
self.validation.classes = self.classes
self.validation.__filecount = valcount
valcount = math.ceil(len(*self.val_split) =[:valcount] =[valcount:]
def __openframes(self, classname, file):
""" Append ORIGNALS frames in memory, transformations are made on the fly """
frames = []
vid = cv.VideoCapture(file)
while True:
grabbed, frame =
if not grabbed:
frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
frame = cv.resize(frame, self.target_shape[:2])
if self.move_detection > 0.:
# make a movement detection
f= frames[0]
f = cv.cvtColor(f, cv.COLOR_RGB2GRAY)
f = cv.GaussianBlur(f, (3,3), 7)
last = f
important = []
for i in range(1, len(frames)):
f = frames[i]
cp = cv.cvtColor(f, cv.COLOR_RGB2GRAY)
cp = cv.GaussianBlur(cp, (3,3), 7)
delta = cv.absdiff(cp, last)
thresh = cv.threshold(delta, 25, 255, cv.THRESH_BINARY)[1]
thresh = cv.dilate(thresh, None, iterations=2)
if np.mean(thresh) > self.move_detection:
last = cp
frames = important
step = len(frames)//self.nbframe
if len(frames) >= self.nbframe:
frames = frames[::step][-self.nbframe:]
# add frames in memory
if len(frames) == self.nbframe:
#, frames))
self.__frames[file] = frames
return frames
self.__filecount-=1 # remove one file from the counter
print('\n%s/%s has not enought frames ==> %d' % (classname, file, len(frames)))
return None
def __data_aug(self, batch):
""" Make random transformation based on ImageGenerator arguments"""
T = None
X, Y = [], []
for y, file in batch:
Y.append(self.classes.index(y)) # label
# if self.__frames[file] is an integer, so we need to get frames
images = self.__frames.get(file, False)
if not images:
images = self.__openframes(y, file) # this set self.__frames[file]
x = []
if self.transform:
# apply the same transformation for this frames
T = self.transform.get_random_transform(self.target_shape[:2])
for img in images:
if T:
x.append(self.transform.apply_transform(img, T))
return np.array(X), keras.utils.to_categorical(Y, num_classes=len(self.classes))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment