Skip to content

Instantly share code, notes, and snippets.

@droid666
Created January 29, 2017 19:23
Show Gist options
  • Save droid666/3353c8f1f225a1245fab59f4d1570e89 to your computer and use it in GitHub Desktop.
Save droid666/3353c8f1f225a1245fab59f4d1570e89 to your computer and use it in GitHub Desktop.
Keras thread bug example: tried hacks at line 134-143, no success
import numpy as np
import math
# fix random seed to have determinism
seed = 7
np.random.seed(seed)
import argparse
argparser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argparser.add_argument('-m', '--mode', type=str, default='batch', help='if to submit single batches, or to run whole epochs', choices=('batch', 'epochs'))
argparser.add_argument('-n', '--numEpochs', type=int, default=5, help='(default: %(default)d)')
argparser.add_argument('-r', '--numRows', type=int, default=10000, help='aka epoch size')
argparser.add_argument('-hn', '--numHiddenLayers', type=int, default=5, help='(default: %(default)d)')
argparser.add_argument('-hs', '--hiddenLayerSize', type=int, default=128, help='(default: %(default)d)')
argparser.add_argument('-b', '--batchSize', type=int, default=256, help='(default: %(default)d)')
argparser.add_argument('-t', '--numNets', type=int, default=3, help='how many different nets to test (also number of threads to use)')
args = argparser.parse_args()
print(args)
#number of nets/threads to run
num_nets = args.numNets
#rows = 10000
rows = args.numRows
x_cols = 200
y_cols = 20
hid_layers = args.numHiddenLayers
hid_width = args.hiddenLayerSize
epochs = args.numEpochs
batch_size = args.batchSize
sendBatches = args.mode == 'batch'
def createData(idx):
x = np.random.rand(rows, x_cols + idx)
y = np.random.rand(rows, y_cols + idx)
return (x,y)
def createNet(idx):
from keras.layers import Input, Dense, Dropout, Activation
from keras.models import Model
inputs = Input(shape=(x_cols + idx,))
x = inputs
for i in range(0, hid_layers):
x = Dense(hid_width)(x)
x = Activation('relu')(x)
x = Dropout(0.5)(x)
#create different output layers to see if we execute on wrong model
x = Dense(y_cols + idx)(x)
predictions = Activation('linear')(x)
model = Model(input=inputs, output=predictions)
model.compile(optimizer='adam', loss='mean_squared_error')
model.myX, model.myY = createData(idx)
return model
def trainEpochs(model):
model.fit(model.myX, model.myY, nb_epoch=epochs, batch_size=batch_size)
def trainBatches(model):
r = len(model.myX)
for e in range(0, epochs):
start = 0
while start < r:
end = start + batch_size
if end > r:
end = r
model.train_on_batch(model.myX[start:end], model.myY[start:end])
start = end
def evalEpochs(model):
return model.predict(model.myX, batch_size=batch_size)
def evalBatches(model):
r = len(model.myX)
for e in range(0, epochs):
start = 0
while start < r:
end = start + batch_size
if end > r:
end = r
model.predict_on_batch(model.myX[start:end])
start = end
def warmup(nets):
for model in nets:
model.train_on_batch(model.myX[0:16], model.myY[0:16])
model.predict_on_batch(model.myX[0:16])
def runNet(net):
#net = createNet()
if sendBatches:
trainBatches(net)
y = evalBatches(net)
else:
trainEpochs(net)
y = evalEpochs(net)
return y
def createNets():
nets = []
for i in range(0, num_nets):
nets.append(createNet(i))
return nets
##########################################
#start tests
from timeit import default_timer as timer
def testSeq(nets):
name = 'test_sequential'
print(name + ": starting...")
start = timer()
for net in nets:
runNet(net)
diff = timer() - start
print(name + ": Run " + str(len(nets)) + " in " + str(diff) + " secs.")
#from keras import backend as K
#K.clear_session()
return (name, diff)
def testThreads(nets, threadfac=None):
if threadfac is None:
from threading import Thread
threadfac = Thread
name = 'test_' + threadfac.__name__
print(name + ": starting...")
threads = []
#hacks:
from tensorflow import get_default_graph, global_variables_initializer, local_variables_initializer
from keras.backend.tensorflow_backend import get_session
get_session().run(global_variables_initializer())
get_session().run(local_variables_initializer())
tfGraph = get_default_graph()
def threadRunFunction(*myargs):
print('DEBUG: forcing default graph usage.')
with tfGraph.as_default():
runNet(*myargs)
for net in nets:
t = threadfac(target=threadRunFunction, args=(net,))
threads.append(t)
start = timer()
for t in threads:
t.start()
for t in threads:
t.join()
diff = timer() - start
print(name + ": Run " + str(len(nets)) + " in " + str(diff) + " secs.")
#from keras import backend as K
#K.clear_session()
return (name, diff)
#must be called from a process that did not touch tensorflow (so just call it first)
def testProcess(num):
from multiprocessing import Process, Event
threadfac=Process
#res = testThreads(nets, threadfac=threadfac)
#return res
name = 'test_' + threadfac.__name__
print(name + ": starting...")
go = Event()
done = []
ready = []
for i in range(0, num):
ready.append(Event())
done.append(Event())
#K.clear_session()
def otherProcFunc(idx):
import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
#config = tf.ConfigProto(log_device_placement=True)
config = tf.ConfigProto()
#see https://github.com/fchollet/keras/issues/1538 (we need to know how much of gpu is already blocked, can be hard...)
#config.gpu_options.per_process_gpu_memory_fraction = 0.9/num
#see last here: http://stackoverflow.com/questions/34199233/how-to-prevent-tensorflow-from-allocating-the-totality-of-a-gpu-memory
config.gpu_options.allow_growth=True
sess = tf.Session(config=config)
set_session(sess)
#with sess:
net = createNet(idx)
warmup([net])
ready[idx].set()
go.wait()
runNet(net)
done[idx].set()
threads = []
for i in range(0, num):
t = threadfac(target=otherProcFunc, args=(i,))
t.start()
threads.append(t)
for e in ready:
e.wait()
start = timer()
go.set()
for e in done:
e.wait()
diff = timer() - start
for t in threads:
t.join()
print(name + ": Run " + str(num) + " in " + str(diff) + " secs.")
return (name, diff)
if __name__ == "__main__":
res = []
#res.append(testProcess(num_nets))
nets = createNets()
#print('warm up run:')
#warmup(nets)
#res.append(testSeq(nets))
res.append(testThreads(nets))
print("Testing finished.")
print(args)
for n, v in res:
print("{:16}: {:.3} seconds".format(n, v))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment