Skip to content

Instantly share code, notes, and snippets.

@droid666
Last active January 29, 2017 19:05
Show Gist options
  • Save droid666/eebf14dc8e92f3c4bccb92a1c0fd4279 to your computer and use it in GitHub Desktop.
Save droid666/eebf14dc8e92f3c4bccb92a1c0fd4279 to your computer and use it in GitHub Desktop.
Check keras models performance with multiple threads, processes or sequencial (remove comments in final method to use all, commented to show keras bug)
import numpy as np
import math
# fix random seed to have determinism
seed = 7
np.random.seed(seed)
import argparse
argparser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argparser.add_argument('-m', '--mode', type=str, default='batch', help='if to submit single batches, or to run whole epochs', choices=('batch', 'epochs'))
argparser.add_argument('-n', '--numEpochs', type=int, default=5, help='(default: %(default)d)')
argparser.add_argument('-r', '--numRows', type=int, default=10000, help='aka epoch size')
argparser.add_argument('-hn', '--numHiddenLayers', type=int, default=5, help='(default: %(default)d)')
argparser.add_argument('-hs', '--hiddenLayerSize', type=int, default=128, help='(default: %(default)d)')
argparser.add_argument('-b', '--batchSize', type=int, default=256, help='(default: %(default)d)')
argparser.add_argument('-t', '--numNets', type=int, default=3, help='how many different nets to test (also number of threads to use)')
args = argparser.parse_args()
print(args)
#number of nets/threads to run
num_nets = args.numNets
#rows = 10000
rows = args.numRows
x_cols = 200
y_cols = 20
hid_layers = args.numHiddenLayers
hid_width = args.hiddenLayerSize
epochs = args.numEpochs
batch_size = args.batchSize
sendBatches = args.mode == 'batch'
def createData(idx):
x = np.random.rand(rows, x_cols + idx)
y = np.random.rand(rows, y_cols + idx)
return (x,y)
def createNet(idx):
from keras.layers import Input, Dense, Dropout, Activation
from keras.models import Model
inputs = Input(shape=(x_cols + idx,))
x = inputs
for i in range(0, hid_layers):
x = Dense(hid_width)(x)
x = Activation('relu')(x)
x = Dropout(0.5)(x)
#create different output layers to see if we execute on wrong model
x = Dense(y_cols + idx)(x)
predictions = Activation('linear')(x)
model = Model(input=inputs, output=predictions)
model.compile(optimizer='adam', loss='mean_squared_error')
model.myX, model.myY = createData(idx)
return model
def trainEpochs(model):
model.fit(model.myX, model.myY, nb_epoch=epochs, batch_size=batch_size)
def trainBatches(model):
r = len(model.myX)
for e in range(0, epochs):
start = 0
while start < r:
end = start + batch_size
if end > r:
end = r
model.train_on_batch(model.myX[start:end], model.myY[start:end])
start = end
def evalEpochs(model):
return model.predict(model.myX, batch_size=batch_size)
def evalBatches(model):
r = len(model.myX)
for e in range(0, epochs):
start = 0
while start < r:
end = start + batch_size
if end > r:
end = r
model.predict_on_batch(model.myX[start:end])
start = end
def warmup(nets):
for model in nets:
model.train_on_batch(model.myX[0:16], model.myY[0:16])
model.predict_on_batch(model.myX[0:16])
def runNet(net):
#net = createNet()
if sendBatches:
trainBatches(net)
y = evalBatches(net)
else:
trainEpochs(net)
y = evalEpochs(net)
return y
def createNets():
nets = []
for i in range(0, num_nets):
nets.append(createNet(i))
return nets
##########################################
#start tests
from timeit import default_timer as timer
def testSeq(nets):
name = 'test_sequential'
print(name + ": starting...")
start = timer()
for net in nets:
runNet(net)
diff = timer() - start
print(name + ": Run " + str(len(nets)) + " in " + str(diff) + " secs.")
#from keras import backend as K
#K.clear_session()
return (name, diff)
def testThreads(nets, threadfac=None):
if threadfac is None:
from threading import Thread
threadfac = Thread
name = 'test_' + threadfac.__name__
print(name + ": starting...")
threads = []
for net in nets:
t = threadfac(target=runNet, args=(net,))
threads.append(t)
start = timer()
for t in threads:
t.start()
for t in threads:
t.join()
diff = timer() - start
print(name + ": Run " + str(len(nets)) + " in " + str(diff) + " secs.")
#from keras import backend as K
#K.clear_session()
return (name, diff)
#must be called from a process that did not touch tensorflow (so just call it first)
def testProcess(num):
from multiprocessing import Process, Event
threadfac=Process
#res = testThreads(nets, threadfac=threadfac)
#return res
name = 'test_' + threadfac.__name__
print(name + ": starting...")
go = Event()
done = []
ready = []
for i in range(0, num):
ready.append(Event())
done.append(Event())
#K.clear_session()
def otherProcFunc(idx):
import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
#config = tf.ConfigProto(log_device_placement=True)
config = tf.ConfigProto()
#see https://github.com/fchollet/keras/issues/1538 (we need to know how much of gpu is already blocked, can be hard...)
#config.gpu_options.per_process_gpu_memory_fraction = 0.9/num
#see last here: http://stackoverflow.com/questions/34199233/how-to-prevent-tensorflow-from-allocating-the-totality-of-a-gpu-memory
config.gpu_options.allow_growth=True
sess = tf.Session(config=config)
set_session(sess)
#with sess:
net = createNet(idx)
warmup([net])
ready[idx].set()
go.wait()
runNet(net)
done[idx].set()
threads = []
for i in range(0, num):
t = threadfac(target=otherProcFunc, args=(i,))
t.start()
threads.append(t)
for e in ready:
e.wait()
start = timer()
go.set()
for e in done:
e.wait()
diff = timer() - start
for t in threads:
t.join()
print(name + ": Run " + str(num) + " in " + str(diff) + " secs.")
return (name, diff)
if __name__ == "__main__":
res = []
#res.append(testProcess(num_nets))
nets = createNets()
#print('warm up run:')
#warmup(nets)
#res.append(testSeq(nets))
res.append(testThreads(nets))
print("Testing finished.")
print(args)
for n, v in res:
print("{:16}: {:.3} seconds".format(n, v))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment