Skip to content

Instantly share code, notes, and snippets.

@bhtucker
Last active April 23, 2019 22:51
Show Gist options
  • Save bhtucker/641827cf8024e286335855aae72e0613 to your computer and use it in GitHub Desktop.
Save bhtucker/641827cf8024e286335855aae72e0613 to your computer and use it in GitHub Desktop.
Use a Hogwild-inspired algorithm to learn logistic regression over a sample dataset in parallel
"""
Demo of using Hogwild algorthim for parallel learning with shared memory
Uses sklearn's LogisticRegression for accuracy comparison
Output
('initial accuracy:', 0.45333333333333331)
worker 25974 score 0.93
worker 25975 score 0.92
worker 25976 score 0.88
worker 25974 score 0.94
worker 25975 score 0.92
worker 25976 score 0.88
worker 25974 score 0.94
worker 25975 score 0.92
worker 25976 score 0.88
worker 25974 score 0.94
worker 25975 score 0.92
worker 25976 score 0.88
worker 25974 score 0.94
worker 25976 score 0.88
worker 25975 score 0.92
worker 25974 score 0.94
worker 25976 score 0.88
worker 25975 score 0.92
('final accuracy:', 0.91333333333333333)
('sklearn accuracy:', 0.91333333333333333)
"""
import numpy as np
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import os
from multiprocessing import Process, Pool
from multiprocessing.sharedctypes import Array
from ctypes import c_double
base_learning_rate = .1 # in addition to normalization relative to batch size
seed = 338 # for reproducibility (though the process-unsafe code is nondeterministic)
proc_count = 3 # how many processes?
n_features = 4 # dimensionality of generated data
def sigmoid(z):
# returns activation for dot product
return 1 / (1 + np.exp(-z))
def sigmoid_prime(a):
# returns activation derivative + 1e-5 to avoid vanishing gradients
return (a * (1 - a)) + 1e-5
def train_step(x, w, y):
activation = sigmoid(np.dot(x, w))
# how much did we miss?
error = y - activation
# multiply how much we missed by the
# slope of the sigmoid at the current activation
delta = error * sigmoid_prime(activation)
# return gradient update
return x * delta * base_learning_rate
def mse(data, w):
x, y = data
y_hat = np.apply_along_axis(sigmoid, arr=np.dot(x, w), axis=0)
return np.mean(y_hat - y)
def accuracy(data, w):
x, y = data
y_hat = np.apply_along_axis(lambda v: np.round(sigmoid(v)), arr=np.dot(x, w), axis=0)
return accuracy_score(y_hat, y)
def worker_func(data, batch_size=10, passes=300, score_func=accuracy):
w = np.ctypeslib.as_array(shared_array_base)
pid = os.getpid()
learning_rate = 1. / batch_size
update_buffer = []
for p in range(passes):
for x, y in zip(*data):
update_buffer.append(train_step(x, w, y))
if len(update_buffer) >= batch_size:
w += (sum(update_buffer) * learning_rate)
update_buffer = []
if p % 50 == 0:
print('worker %s score %s' % (pid, score_func(data, w)))
def setup_shared_array():
# returns an array that's shareable via Pool.map
shared_array_base = Array(c_double, n_features, lock=False)
np.random.seed(seed)
for ix, val in enumerate(np.random.rand(n_features)):
shared_array_base[ix] = val / 4. # divide by 4 to get closer to zero -> stronger gradients
return shared_array_base
if __name__ == '__main__':
data_size = proc_count * 100
data = make_classification(
n_samples=data_size, n_classes=2, random_state=seed, n_features=n_features
)
shared_array_base = setup_shared_array()
# split up data for workers
worker_chunk_size = data_size / proc_count
data_chunks = [
(data[0][i: i + worker_chunk_size], data[1][i: i + worker_chunk_size])
for i in range(0, data_size, worker_chunk_size)
]
# create a shared array
shared_array_base = setup_shared_array()
pool = Pool(proc_count)
print("initial accuracy:", accuracy(data, np.ctypeslib.as_array(shared_array_base)))
pool.map(worker_func, data_chunks)
print("final accuracy:", accuracy(data, np.ctypeslib.as_array(shared_array_base)))
lr = LogisticRegression()
lr.fit(*data)
print("sklearn accuracy:", lr.score(*data))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment