Skip to content

Instantly share code, notes, and snippets.

@kleysonr
Last active December 16, 2018 16:10
Show Gist options
  • Save kleysonr/a41f0d72891afec8a49990c8cc24f5e4 to your computer and use it in GitHub Desktop.
Save kleysonr/a41f0d72891afec8a49990c8cc24f5e4 to your computer and use it in GitHub Desktop.
Custom generator function to be used with keras fit_generator()
import math, os
import numpy as np
import cv2
from sklearn.preprocessing import LabelBinarizer
from imutils import paths
import time
"""
File name: keras_batch_generator.py
Author: Kleyson Rios
Email: kleysonr@gmail.com
"""
class KerasBatchGenerator():
def __init__(self, dataset_path, test_ratio=0.25, batch_size=32, imagesize=(300,300), preprocessors=[]):
# Dict mapping classes and images path
self.data = {}
# Dict for trai and test dataset
self.train_test = {}
# Lenght of th dataset
self.datasetsize = 0
# Number of images sent in each chunk
self.batch_size = batch_size
# Number of images of the smallest class
self.minsize = math.inf
# Ratio from full dataset for testing
self.test_ratio = test_ratio
# Image size to feed into NN
self.imagesize = imagesize
# List of preprocessors to apply
self.preprocessors = preprocessors
# Index to control the number of images for epoch
self.current_idx = {'train': 0, 'test': 0}
# Mapping between class name and one hot enconding
self.onehotencoding = None
self.lb = LabelBinarizer()
# Get a list of all the images under dataset/{class}/*
fileslist = paths.list_images(dataset_path)
for file in fileslist:
# Extract the label
label = file.split(os.path.sep)[-2]
# Populate dict mapping
try:
self.data[label]
except KeyError:
self.data[label] = []
finally:
self.data[label].append(file)
self.datasetsize += 1
# Loop over each class
for k in self.data.keys():
# Save the size of the smallest class
self.minsize = len(self.data[k]) if len(self.data[k]) < self.minsize else self.minsize
# Calculate the offset where test samples begins, based on the smallest class.
# Force to have balanced classes for training.
self.offset = int(self.minsize * (1.0 - self.test_ratio))
# Create One-hot-encoding
classes_name = list(self.data.keys())
self.onehotencoding = dict(zip(classes_name, self.lb.fit_transform(classes_name)))
# Split the full dataset in train and test sets
self.split_train_test()
def split_train_test(self):
_train = []
_test = []
# Loop over each class
for k in self.data.keys():
# Shuffle the images in each class
items = self.data[k]
np.random.shuffle(items)
_train += items[:self.offset]
_test += items[self.offset:]
np.random.shuffle(_train)
np.random.shuffle(_test)
self.train_test['train'] = _train
self.train_test['test'] = _test
def getNumberOfClasses(self):
return len(self.data.keys())
def getDatasetSize(self):
return self.datasetsize
def getBatchSize(self):
return self.batch_size
def getTrainingSize(self):
return len(self.data.keys()) * int(self.minsize * (1.0 - self.test_ratio))
def getTestingSize(self):
return self.getDatasetSize() - self.getTrainingSize()
def generate(self, set='train'):
try:
assert set == 'train' or set == 'test'
except AssertionError as e:
e.args += ('Sets valid: train or test', set)
raise
datasets_size = {
'train': self.getTrainingSize(),
'test': self.getTestingSize()
}
batch = 0
while True:
images = []
labels = []
for i in range(self.batch_size):
# Restart the index for the first image
if self.current_idx[set] >= datasets_size[set]:
self.current_idx[set] = 0
print('{} --{}-- New epoch'.format(int(time.time()), set))
break
file = self.train_test[set][self.current_idx[set]]
label = file.split(os.path.sep)[-2]
image = self._processImage(file)
images.append(image)
labels.append(self.onehotencoding[label])
self.current_idx[set] += 1
print('Batch: {}-{} <<{}>> {}'.format(batch, i, set, file))
batch += 1
yield np.array(images), np.array(labels)
def _processImage(self, filename):
# Read image
image = cv2.imread(filename)
# check to see if our preprocessors are not Empty
if len(self.preprocessors) > 0:
# loop over the preprocessors and apply each to the image
for p in self.preprocessors:
image = p.preprocess(image)
image = cv2.resize(image, self.imagesize, interpolation=cv2.INTER_AREA)
image.astype('float')
return image.astype('float') / 255.0
def __repr__(self):
return('\tDataset size: {}\n\tTraining size: {}\n\tTest size: {}\n\tClasses: {}\n\tBatch Size: {}'.format(self.getDatasetSize(), self.getTrainingSize(), self.getTestingSize(), self.getNumberOfClasses(), self.getBatchSize()))
@kleysonr
Copy link
Author

How to use:

import math
from keras_batch_generator import KerasBatchGenerator
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.optimizers import RMSprop
from keras.callbacks import TensorBoard

kbg = KerasBatchGenerator('/home/kleysonr/Downloads/keras-generator/dataset', batch_size=6, imagesize=(100, 100), test_ratio=0.3)

print('Training new model ...\n' + str(kbg))

num_classes = kbg.getNumberOfClasses()

model = Sequential()
model.add(Conv2D(32, (3, 3), padding='same', input_shape=(100, 100, 3)))
model.add(Activation('relu'))
model.add(Conv2D(32, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes))
model.add(Activation('softmax'))

# initiate RMSprop optimizer
opt = RMSprop(lr=0.0001, decay=1e-6)

model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

training_steps = math.ceil(kbg.getTrainingSize() / kbg.getBatchSize())
validation_steps = math.ceil(kbg.getTestingSize() / kbg.getBatchSize())

tbCallback = TensorBoard(log_dir='/tmp/tensorboard', histogram_freq=0, batch_size=100, write_graph=True, write_grads=False, write_images=False, embeddings_freq=0, embeddings_layer_names=None, embeddings_metadata=None, embeddings_data=None, update_freq='epoch')

model.fit_generator(kbg.generate(set='train'), 
                    steps_per_epoch=training_steps,
                    epochs=1,
                    verbose=1,
                    callbacks=[tbCallback],
                    validation_data=kbg.generate(set='test'),
                    validation_steps=validation_steps,
                    use_multiprocessing=False,
                    workers=0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment