Skip to content

Instantly share code, notes, and snippets.

@MLnick
Created May 31, 2017 08:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save MLnick/27d71e2a809a54d82381428527e4f494 to your computer and use it in GitHub Desktop.
Save MLnick/27d71e2a809a54d82381428527e4f494 to your computer and use it in GitHub Desktop.
Dask Parameter Server - Initial WIP
# ==== dask-ps
import dask
import dask.array as da
from dask import delayed
from dask_glm import families
from dask_glm.algorithms import lbfgs
from distributed import LocalCluster, Client, worker_client
import numpy as np
import time
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
cluster = LocalCluster(n_workers=0)
cluster.start_worker(1, name="ps")
cluster.start_worker(1, name="w1")
cluster.start_worker(1, name="w2")
client = Client(cluster)
STEP_SIZE = 1.0
N = 10000
D = 10
ITER = 10
X_local, y_local = datasets.make_classification(n_classes=2, n_samples=N, n_features=D)
X = da.from_array(X_local, 1000)
y = da.from_array(y_local, 1000)
XD = X.to_delayed().flatten().tolist() # a list of numpy arrays, one for each chunk
yD = y.to_delayed().flatten().tolist()
STEP_SIZE /= len(XD) # need to adjust based on parallelism for convegence?
family = families.Logistic()
pointwise_gradient = family.pointwise_gradient
pointwise_loss = family.pointwise_loss
def local_update(X, y, beta):
return pointwise_gradient(beta, X, y)
def parameter_server():
beta = np.zeros(D)
gti = np.zeros(D)
with worker_client() as c:
betas = c.channel('betas', maxlen=1)
[future_beta] = c.scatter([beta])
betas.append(future_beta)
betas.flush()
updates = c.channel('updates')
for update in updates:
print("received update: %s" % update)
update = update.result()
gti += update ** 2
adj_grad = update / (1e-6 + np.sqrt(gti))
beta = beta - STEP_SIZE * adj_grad
[future_beta] = c.scatter([beta])
betas.append(future_beta)
betas.flush()
def worker(X, y):
with worker_client(separate_thread=False) as c:
for i in range(ITER):
betas = c.channel('betas', maxlen=1)
time.sleep(0.01)
last_beta = betas.data[-1]
#subset_beta = c.submit(operator.getitem, last_beta, idx).result()
#params = subset_beta.result()
beta = last_beta.result()
print("Computing update with latest beta: %s" % beta)
update = local_update(X, y, beta) #.compute()
[update_future] = c.scatter([update])
updates = c.channel('updates')
updates.append(update_future)
updates.flush()
res = [delayed(worker)(xx, yy) for xx, yy in zip(XD, yD)]
# start PS
res_ps = client.submit(parameter_server, workers=['ps'], pure=False)
# start workers computing
res2 = [d.compute(workers=['w1', 'w2']) for d in res]
# collect beta from PS
beta_chan = client.channel('betas', maxlen=1)
time.sleep(0.1)
beta_sgd = beta_chan.data[-1].result()
# compare to L-BFGS solution
beta_lbfgs = lbfgs(X, y, lamduh=1e-7)
# also compare to sklearn LR:
lr = LogisticRegression(fit_intercept=False, C=1000000, solver='lbfgs')
lr.fit(X, y)
print("sklearn LR (lbfgs):\nBeta: %s" % lr.coef_)
print("Score: %s" % lr.score(X, y))
print("AUC: %s" % roc_auc_score(y, lr.decision_function(X)))
print()
print("Beta DaskGLM lbfgs:\nBeta: %s" % beta_lbfgs)
lr.coef_ = beta_lbfgs.reshape(1, D)
print("Score: %s" % lr.score(X, y))
print("AUC: %s" % roc_auc_score(y, lr.decision_function(X)))
print()
print("Beta DaskPS Adagrad:\n%s" % beta_sgd)
lr.coef_ = beta_sgd.reshape(1, D)
print("Score: %s" % lr.score(X, y))
print("AUC: %s" % roc_auc_score(y, lr.decision_function(X)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment