Skip to content

Instantly share code, notes, and snippets.

@samehmohamed88
Last active January 11, 2017 03:46
Show Gist options
  • Save samehmohamed88/f3d831140ead1edb09daa9c10ca847ec to your computer and use it in GitHub Desktop.
Save samehmohamed88/f3d831140ead1edb09daa9c10ca847ec to your computer and use it in GitHub Desktop.
import os
import pickle
import json
import random
import csv
import cv2
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D
from keras.layers import Activation, Dropout, Flatten, Dense
from keras.layers.core import Lambda
from keras.callbacks import Callback
from keras.utils import np_utils
_index_in_epoch = 0
nb_epoch = 10
batch_size = 32
img_height, img_width = 64, 64
global threshold
global zero_count
global zeros_to_add
threshold = .05
zero_count = 0
zeros_to_add = 0
class LifecycleCallback(Callback):
def on_epoch_begin(self, epoch, logs={}):
pass
def on_epoch_end(self, epoch, logs={}):
global threshold
threshold = threshold + (epoch * .05)
if threshold > 0.8:
threshold = 0.2
def adjust_brightness(img):
image = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
random_bright = .25+np.random.uniform()
image[:,:,2] = image[:,:,2]*random_bright
image = cv2.cvtColor(image, cv2.COLOR_HSV2RGB)
return image
def shuffle(x, y):
perm = np.arange(len(x))
np.random.shuffle(perm)
x = x[perm]
y = y[perm]
return (x, y)
def train_test_split(X, Y):
count = int(len(X)*.7)
X_train = X[:count]
Y_train = Y[:count]
X_val = X[count:]
Y_val = Y[count:]
return (X_train, Y_train, X_val, Y_val)
def load_training_and_validation():
global zero_count
rows, labels = [], []
with open('data/driving_log.csv', 'r') as _f:
reader = csv.reader(_f, delimiter=',')
next(reader, None)
for row in reader:
rows.append(row[0].strip())
labels.append(float(row[3]))
# left camera
rows.append(row[1].strip())
labels.append(float(row[3])+.25)
# right camera
rows.append(row[2].strip())
labels.append(float(row[3])-0.25)
if float(row[3]) == 0:
zero_count += 1
assert len(rows) == len(labels), 'unbalanced data'
# shuffle the data
X, Y = shuffle(np.array(rows), np.array(labels))
# split into training and validation
return train_test_split(X, Y)
def resize_image(img):
return cv2.resize(img,( 64, 64))
def affine_transform(img, angle, adjust, sub=False):
cols, rows, ch = img.shape
if sub:
angle -= adjust
pts1 = np.float32([[75, 75], [150, 75], [50, 250]])
pts2 = np.float32([[60, 60], [140, 75], [50, 320]])
else:
angle += adjust
pts1 = np.float32([[75, 75], [150, 75], [50,250]])
pts2 = np.float32([[65, 75], [165, 75], [50,250]])
M = cv2.getAffineTransform(pts1, pts2)
dst = cv2.warpAffine(img, M, (rows, cols))
return dst.reshape((cols, rows, ch)), angle
def next_batch(data, labels, batch_size):
"""Return the next `batch_size` examples from this data set."""
global _index_in_epoch
start = _index_in_epoch
_index_in_epoch += batch_size
_num_examples = len(data)
if _index_in_epoch > _num_examples:
# Shuffle the data
data, labels = shuffle(data, labels)
# Start next epoch
start = 0
_index_in_epoch = batch_size
assert batch_size <= _num_examples
end = _index_in_epoch
return data[start:end], labels[start:end]
def transform_generator(x, y, batch_size=32, is_validation=False):
global zeros_to_add
global zero_count
while True:
bad = []
images, labels = list(), list()
_images, _labels = next_batch(x, y, batch_size)
current = os.path.dirname(os.path.realpath(__file__))
for i in range(len(_images)):
img = cv2.imread('{}/data/{}'.format(current, _images[i]), 1)
if img is None: continue
image = adjust_brightness(img)
resized = resize_image(image)
angle = _labels[i]
images.append(resized)
labels.append(angle)
# resized = img.reshape(img_height, img_width, 3)
if angle == 0:
zeros_to_add += 1
if zeros_to_add < int(zero_count*.25):
resized = resize_image(img)
angle = _labels[i]
images.append(resized)
labels.append(angle)
continue
elif angle > 0:
sub = False
else:
sub = True
if abs(angle) > 0:
if is_validation: continue
adjust = .06 * 5 * threshold
# image, angle = affine_transform(img, angle, adjust, sub=sub)
# resized = resize_image(image)
# images.append(resized)
# labels.append(angle)
image = cv2.flip(img, 1)
resized = resize_image(image)
images.append(resized)
labels.append(-1*angle)
X = np.array(images, dtype=np.float64).reshape((-1, img_height, img_width, 3))
Y = np.array(labels, dtype=np.float64)
yield (X, Y)
def gen_model():
model = Sequential()
# (((64 - 5) + 4) / 1.) + 1
model.add(Convolution2D(16, 5, 5, subsample=(1, 1), input_shape=(img_height, img_width, 3)))
model.add(ZeroPadding2D(padding=(2, 2)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
# (((64 - 4) + 4) / 2.) + 1
model.add(Convolution2D(32, 4, 4, subsample=(1, 1)))
model.add(ZeroPadding2D(padding=(2, 2)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
# (((33 - 3) + 4) / 2.) + 1
model.add(Convolution2D(64, 3, 3, subsample=(1, 1)))
model.add(ZeroPadding2D(padding=(2, 2)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
# (((18 - 2) + 4) / 2.) + 1
model.add(Convolution2D(64, 2, 2, subsample=(2, 2)))
model.add(ZeroPadding2D(padding=(0, 0)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(1024))
model.add(Dropout(0.5))
model.add(Activation('sigmoid'))
model.add(Dense(1))
adam = Adam(lr=0.0001)
model.compile(loss='mean_squared_error', optimizer=adam)
return model
def main():
X_train, Y_train, X_val, Y_val = load_training_and_validation()
assert len(X_train) == len(Y_train), 'unbalanced training data'
assert len(X_val) == len(Y_val), 'unbalanced validation data'
print(len(X_train), "training images and ", len(X_val), "validation images")
model = gen_model()
filepath = "weights-improvement-{epoch:02d}-{val_loss:.4f}.h5"
checkpoint = ModelCheckpoint(filepath, verbose=1, save_best_only=True)
model.fit_generator(
transform_generator(X_train, Y_train),
samples_per_epoch=(len(X_train)*2),
nb_epoch=nb_epoch,
validation_data=transform_generator(X_val, Y_val, is_validation=True),
nb_val_samples=len(X_val),
callbacks=[checkpoint])
print("Saving model weights and configuration file.")
if not os.path.exists("./outputs/sim"):
os.makedirs("./outputs/sim")
model.save_weights("./outputs/sim/sim.h5", True)
with open('./outputs/sim/sim.json', 'w') as outfile:
json.dump(model.to_json(), outfile)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment