Last active
January 29, 2017 19:05
-
-
Save droid666/eebf14dc8e92f3c4bccb92a1c0fd4279 to your computer and use it in GitHub Desktop.
Check keras models performance with multiple threads, processes or sequencial (remove comments in final method to use all, commented to show keras bug)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import math | |
# fix random seed to have determinism | |
seed = 7 | |
np.random.seed(seed) | |
import argparse | |
argparser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
argparser.add_argument('-m', '--mode', type=str, default='batch', help='if to submit single batches, or to run whole epochs', choices=('batch', 'epochs')) | |
argparser.add_argument('-n', '--numEpochs', type=int, default=5, help='(default: %(default)d)') | |
argparser.add_argument('-r', '--numRows', type=int, default=10000, help='aka epoch size') | |
argparser.add_argument('-hn', '--numHiddenLayers', type=int, default=5, help='(default: %(default)d)') | |
argparser.add_argument('-hs', '--hiddenLayerSize', type=int, default=128, help='(default: %(default)d)') | |
argparser.add_argument('-b', '--batchSize', type=int, default=256, help='(default: %(default)d)') | |
argparser.add_argument('-t', '--numNets', type=int, default=3, help='how many different nets to test (also number of threads to use)') | |
args = argparser.parse_args() | |
print(args) | |
#number of nets/threads to run | |
num_nets = args.numNets | |
#rows = 10000 | |
rows = args.numRows | |
x_cols = 200 | |
y_cols = 20 | |
hid_layers = args.numHiddenLayers | |
hid_width = args.hiddenLayerSize | |
epochs = args.numEpochs | |
batch_size = args.batchSize | |
sendBatches = args.mode == 'batch' | |
def createData(idx): | |
x = np.random.rand(rows, x_cols + idx) | |
y = np.random.rand(rows, y_cols + idx) | |
return (x,y) | |
def createNet(idx): | |
from keras.layers import Input, Dense, Dropout, Activation | |
from keras.models import Model | |
inputs = Input(shape=(x_cols + idx,)) | |
x = inputs | |
for i in range(0, hid_layers): | |
x = Dense(hid_width)(x) | |
x = Activation('relu')(x) | |
x = Dropout(0.5)(x) | |
#create different output layers to see if we execute on wrong model | |
x = Dense(y_cols + idx)(x) | |
predictions = Activation('linear')(x) | |
model = Model(input=inputs, output=predictions) | |
model.compile(optimizer='adam', loss='mean_squared_error') | |
model.myX, model.myY = createData(idx) | |
return model | |
def trainEpochs(model): | |
model.fit(model.myX, model.myY, nb_epoch=epochs, batch_size=batch_size) | |
def trainBatches(model): | |
r = len(model.myX) | |
for e in range(0, epochs): | |
start = 0 | |
while start < r: | |
end = start + batch_size | |
if end > r: | |
end = r | |
model.train_on_batch(model.myX[start:end], model.myY[start:end]) | |
start = end | |
def evalEpochs(model): | |
return model.predict(model.myX, batch_size=batch_size) | |
def evalBatches(model): | |
r = len(model.myX) | |
for e in range(0, epochs): | |
start = 0 | |
while start < r: | |
end = start + batch_size | |
if end > r: | |
end = r | |
model.predict_on_batch(model.myX[start:end]) | |
start = end | |
def warmup(nets): | |
for model in nets: | |
model.train_on_batch(model.myX[0:16], model.myY[0:16]) | |
model.predict_on_batch(model.myX[0:16]) | |
def runNet(net): | |
#net = createNet() | |
if sendBatches: | |
trainBatches(net) | |
y = evalBatches(net) | |
else: | |
trainEpochs(net) | |
y = evalEpochs(net) | |
return y | |
def createNets(): | |
nets = [] | |
for i in range(0, num_nets): | |
nets.append(createNet(i)) | |
return nets | |
########################################## | |
#start tests | |
from timeit import default_timer as timer | |
def testSeq(nets): | |
name = 'test_sequential' | |
print(name + ": starting...") | |
start = timer() | |
for net in nets: | |
runNet(net) | |
diff = timer() - start | |
print(name + ": Run " + str(len(nets)) + " in " + str(diff) + " secs.") | |
#from keras import backend as K | |
#K.clear_session() | |
return (name, diff) | |
def testThreads(nets, threadfac=None): | |
if threadfac is None: | |
from threading import Thread | |
threadfac = Thread | |
name = 'test_' + threadfac.__name__ | |
print(name + ": starting...") | |
threads = [] | |
for net in nets: | |
t = threadfac(target=runNet, args=(net,)) | |
threads.append(t) | |
start = timer() | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
diff = timer() - start | |
print(name + ": Run " + str(len(nets)) + " in " + str(diff) + " secs.") | |
#from keras import backend as K | |
#K.clear_session() | |
return (name, diff) | |
#must be called from a process that did not touch tensorflow (so just call it first) | |
def testProcess(num): | |
from multiprocessing import Process, Event | |
threadfac=Process | |
#res = testThreads(nets, threadfac=threadfac) | |
#return res | |
name = 'test_' + threadfac.__name__ | |
print(name + ": starting...") | |
go = Event() | |
done = [] | |
ready = [] | |
for i in range(0, num): | |
ready.append(Event()) | |
done.append(Event()) | |
#K.clear_session() | |
def otherProcFunc(idx): | |
import tensorflow as tf | |
from keras.backend.tensorflow_backend import set_session | |
#config = tf.ConfigProto(log_device_placement=True) | |
config = tf.ConfigProto() | |
#see https://github.com/fchollet/keras/issues/1538 (we need to know how much of gpu is already blocked, can be hard...) | |
#config.gpu_options.per_process_gpu_memory_fraction = 0.9/num | |
#see last here: http://stackoverflow.com/questions/34199233/how-to-prevent-tensorflow-from-allocating-the-totality-of-a-gpu-memory | |
config.gpu_options.allow_growth=True | |
sess = tf.Session(config=config) | |
set_session(sess) | |
#with sess: | |
net = createNet(idx) | |
warmup([net]) | |
ready[idx].set() | |
go.wait() | |
runNet(net) | |
done[idx].set() | |
threads = [] | |
for i in range(0, num): | |
t = threadfac(target=otherProcFunc, args=(i,)) | |
t.start() | |
threads.append(t) | |
for e in ready: | |
e.wait() | |
start = timer() | |
go.set() | |
for e in done: | |
e.wait() | |
diff = timer() - start | |
for t in threads: | |
t.join() | |
print(name + ": Run " + str(num) + " in " + str(diff) + " secs.") | |
return (name, diff) | |
if __name__ == "__main__": | |
res = [] | |
#res.append(testProcess(num_nets)) | |
nets = createNets() | |
#print('warm up run:') | |
#warmup(nets) | |
#res.append(testSeq(nets)) | |
res.append(testThreads(nets)) | |
print("Testing finished.") | |
print(args) | |
for n, v in res: | |
print("{:16}: {:.3} seconds".format(n, v)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment