Skip to content

Instantly share code, notes, and snippets.

@iaroslav-ai
Last active January 22, 2018 10:46
Show Gist options
  • Save iaroslav-ai/93aa78385f55a3bcee8b1dd7c46a2215 to your computer and use it in GitHub Desktop.
Save iaroslav-ai/93aa78385f55a3bcee8b1dd7c46a2215 to your computer and use it in GitHub Desktop.
"""
Comparison of parallel vs sequential optimization,
where constant lie appraoch is used.
"""
from threading import Thread
from copy import deepcopy
from skopt import Optimizer
from skopt.learning import ExtraTreesRegressor, GaussianProcessRegressor
from skopt.space import Real
import numpy as np
import time
class ParallelOptimizer():
def __init__(self, optimizer):
self.optimizer = optimizer # instance of sequential solver, will be copied
self.rng = optimizer.rng
# this is used to store all the data
self.data = {} # has the form str(point): (point_list, objective_float, is_actual_objective_bool)
def ask(self):
opt = deepcopy(self.optimizer)
opt.rng = self.rng
keys = self.data.keys()
X = [self.data[k][0] for k in keys]
Y = [self.data[k][1] for k in keys]
if len(X) > 0:
opt._n_random_starts = max(0, opt._n_random_starts - len(X))
opt.tell(X, Y)
x = opt.ask()
y_lie = np.mean(Y) if len(Y) > 0 else 0.0
opt.tell(x, y_lie)
self.data[str(x)] = (x, y_lie, False)
return x
def tell(self, x, y):
if y is None: # necessary to handle points for which computation failed, eg due to network failure
del self.data[str(x)]
return
self.data[str(x)] = (x, y, True)
def clear(self):
# deletes all the lie points
self.data = {k:v for k,v in self.data.items() if v[-1]}
# this is run in a thread, thus returning result in a weird fashion via xy array
def obj(xy):
time.sleep(np.random.rand()*3.0)
try:
# this is where evaluation can fail
if np.random.rand() < 0.1:
raise BaseException("Schroedinger's exception")
y = np.sum(np.array(xy[0]) ** 2)
except BaseException:
y = None
xy[1] = y
dimensions = [Real(-3.0, 3.0) for i in range(5)]
optimizer = Optimizer(base_estimator=GaussianProcessRegressor(), dimensions=dimensions, acq_optimizer='sampling')
n_runs = 11
n_calls = 29
results = {}
for config in [(8,False), (8,True), ]:
n_jobs, batch_computation = config # batch_computation: whether to run evluations in batches of n_jobs
history = []
for rep in range(n_runs):
start_time = time.time()
parallel_optimizer = ParallelOptimizer(optimizer)
history.append([])
threads_xy = [] # array of elements of the form [thread_obj, [x,y]],
# where [x,y] is used to return the result of computation
# approach kinda like https://sigopt.com/docs/overview/parallel
total_evaluations = n_calls
while total_evaluations > 0:
updated_threads_xy = [] # this will be an array of threads_xy that did not finish computing yet
for t in threads_xy:
if not t[0].is_alive():
x, y = t[1]
parallel_optimizer.tell(x,y)
# for simplicity total_evaluations is not decreased if experiment failed
if y is not None:
history[-1].append([time.time() - start_time, y])
total_evaluations -= 1
print("y=",y,"n_calls=",total_evaluations)
else:
updated_threads_xy.append(t)
threads_xy = updated_threads_xy
if batch_computation and len(threads_xy) > 0:
continue
while len(threads_xy) < min(total_evaluations, n_jobs):
x = parallel_optimizer.ask()
args = [x, None]
thread = Thread(target=obj, args=[args])
thread.daemon = True
thread.start()
threads_xy.append([thread, args])
time.sleep(0.01)
results[str(config)] = history
"""EVERYTHING BELOW IS ONLY NEEDED TO PLOT NICELY RESULTS"""
def average_run(history):
history = np.array(history)
y = history[:,:,1]
x = history[:,:,0] # time stamps
x = (x.T - np.min(x, axis=1)).T # first measurement has timestamp of 0.0
resolution = 1000
# find the resolution timestamp = min. non - zero distance between two timestamps
time_step = np.max(x)/float(resolution)
current_time = 0.0
y_binned = []
x_binned = []
current_y = y[:, 0]
while current_time < np.max(x):
y_binned.append(np.copy(current_y))
selection = (current_time <= x) & (x <= current_time + time_step)
I = np.any(selection, axis=1)
mean_selection = np.sum( y*selection, axis=1 ) / np.sum(selection, axis=1)
current_y[I] = mean_selection[I]
current_time += time_step
x_binned.append(current_time)
y_binned = np.column_stack(y_binned)
y_binned = np.minimum.accumulate(y_binned, axis=1)
y_binned = np.mean(y_binned, axis=0)
x_binned = np.array(x_binned)
return x_binned, y_binned
# plot the history
import matplotlib.pyplot as plt
for i,k in enumerate(results):
x,y = average_run(results[k])
plt.plot(x,y, label=k, c=[float(v) for v in np.binary_repr(i,3)])
plt.xlabel("Time")
plt.ylabel("Best objective found")
plt.title("Results for different (n_jobs, batch_computations)")
plt.legend()
plt.grid()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment