Skip to content

Instantly share code, notes, and snippets.

@ogrisel
Last active March 8, 2017 14:40
Show Gist options
  • Save ogrisel/ef8cbc1e42cef66d4184 to your computer and use it in GitHub Desktop.
Save ogrisel/ef8cbc1e42cef66d4184 to your computer and use it in GitHub Desktop.
experiment with dask and sklearn
import os
from time import time
import h5py
import numpy as np
import dask.array as da
from sklearn.linear_model import Perceptron
def generate_data(n_samples=int(1e6), n_features=int(1e3),
n_informative=0.1, noise_scale=0.01,
blocksize=int(1e3), seed=0):
"""Generates some random, almost linearly separable data."""
rng = np.random.RandomState(0)
if n_informative < 1:
n_informative = int(np.ceil(n_informative * n_features))
if blocksize > n_samples:
blocksize = n_samples
true_weights = rng.normal(size=n_features)
true_weights[n_informative:] = 0.0
true_intercept = rng.normal(scale=0.1)
n_blocks = n_samples // blocksize
block_seeds = rng.random_integers(0, 2 ** 32 - 1, n_blocks)
def make_input_data(block_idx):
rng = np.random.RandomState(block_seeds[block_idx])
return rng.normal(size=(blocksize, n_features))
def make_target(block_idx, input_block):
rng = np.random.RandomState(block_seeds[block_idx])
noise = rng.normal(scale=noise_scale, size=blocksize)
target_block = np.dot(input_block, true_weights.T) + noise
return (target_block + true_intercept > 0).astype(np.int32)
dsk_features, dsk_target = {}, {}
for i in range(n_blocks):
dsk_features[('features', i, 0)] = (make_input_data, i)
dsk_target[('target', i)] = (make_target, i, ('features', i, 0))
# input blocks are required to generate output blocks
dsk_target.update(dsk_features)
features = da.Array(dsk_features, 'features',
shape=(n_samples, n_features),
chunks=(blocksize, n_features))
target = da.Array(dsk_target, 'target',
shape=(n_samples,),
chunks=(blocksize,))
return features, target
if __name__ == "__main__":
data_filename = 'data.hdf5'
if not os.path.exists(data_filename):
features, target = generate_data(n_samples=int(1e6),
n_features=int(1e3))
data_size = features.nbytes + target.nbytes
print("Generating %0.3fGB of training data in %s" %
(data_size / 1e9, data_filename))
with h5py.File(data_filename, 'w') as f:
dset_features = f.require_dataset(
'/data/features',
shape=features.shape,
chunks=tuple(c[0] for c in features.chunks),
dtype=features.dtype)
dset_target = f.require_dataset(
'/data/target',
shape=target.shape,
chunks=tuple(c[0] for c in target.chunks),
dtype=target.dtype)
da.store([features, target], [dset_features, dset_target])
# TODO: make it possible to do:
# da.store({'/data/features': features,
# '/data/target': target},
# 'hdf5://data.hdf5')
print("Using data from %s" % data_filename)
# TODO: make it possible do read an open the hdf5 lazily instead of having
# to open it globally before calling `da.from_array`
with h5py.File(data_filename, 'r+') as f:
features_ds = f['/data/features']
features = da.from_array(features_ds, chunks=features_ds.chunks)
target_ds = f['/data/target']
target = da.from_array(target_ds, chunks=target_ds.chunks)
print("Fitting a perceptron model")
t0 = time()
model = da.learn.fit(Perceptron(), features, target, classes=[0, 1])
print("%0.3fs" % (time() - t0))
print("Computing predictions")
t0 = time()
predictions = da.learn.predict(model, features)
dset_target = f.require_dataset(
'/data/predictions',
shape=predictions.shape,
chunks=tuple(c[0] for c in predictions.chunks),
dtype=predictions.dtype)
da.store(predictions, dset_target)
print("%0.3fs" % (time() - t0))
import os
from time import time
import h5py
import numpy as np
import dask.array as da
from dask import threaded
from dask.base import tokenize
from dask.context import _globals
from dask.diagnostics import Profiler
from sklearn.linear_model import Perceptron
from sklearn.base import clone
def generate_data(n_samples=int(1e6), n_features=int(1e3),
n_informative=0.1, noise_scale=0.01,
blocksize=int(1e3), seed=0):
"""Generates some random, almost linearly separable data."""
rng = np.random.RandomState(0)
if n_informative < 1:
n_informative = int(np.ceil(n_informative * n_features))
if blocksize > n_samples:
blocksize = n_samples
true_weights = rng.normal(size=n_features)
true_weights[n_informative:] = 0.0
true_intercept = rng.normal(scale=0.1)
n_blocks = n_samples // blocksize
block_seeds = rng.random_integers(0, 2 ** 32 - 1, n_blocks)
def make_input_data(block_idx):
rng = np.random.RandomState(block_seeds[block_idx])
return rng.normal(size=(blocksize, n_features))
def make_target(block_idx, input_block):
rng = np.random.RandomState(block_seeds[block_idx])
noise = rng.normal(scale=noise_scale, size=blocksize)
target_block = np.dot(input_block, true_weights.T) + noise
return (target_block + true_intercept > 0).astype(np.int32)
dsk_features, dsk_target = {}, {}
for i in range(n_blocks):
dsk_features[('features', i, 0)] = (make_input_data, i)
dsk_target[('target', i)] = (make_target, i, ('features', i, 0))
# input blocks are required to generate output blocks
dsk_target.update(dsk_features)
features = da.Array(dsk_features, 'features',
shape=(n_samples, n_features),
chunks=(blocksize, n_features))
target = da.Array(dsk_target, 'target',
shape=(n_samples,),
chunks=(blocksize,))
return features, target
def _partial_fit(model, x, y, kwargs=None):
kwargs = kwargs or dict()
model.partial_fit(x, y, **kwargs)
return model
def _compute_score(model, x, y):
return model.score(x, y)
def cross_validate(estimator, input_data, target, n_folds=5,
get=None, compute=True, **kwargs):
get = get or _globals['get'] or threaded.get
dsk = {}
n_blocks = len(input_data.chunks[0])
if len(input_data.chunks[1]) > 1:
input_data = input_data.reblock(
chunks=(n_blocks, sum(input_data.chunks[1])))
model_keys = []
validation_score_keys = []
name_template = 'cross_validate_fold_%d' + tokenize(
estimator, input_data, target, n_folds)
for fold_id in range(n_folds):
model = clone(estimator)
name = name_template % fold_id
model_id = 0
validation_id = 0
dsk[(name, 'model', model_id)] = model
fold_score_keys = []
validation_score_keys.append(fold_score_keys)
for block_id in range(n_blocks):
if block_id % n_folds == fold_id:
# Validation block
if block_id > 0:
# We cannot compute scores on a fold for a model that was
# not fitted first, let's just skip for now and compute
# the validation scores starting from the next validation
# block
dsk[(name, 'validation', validation_id)] = (
_compute_score,
(name, 'model', model_id),
(input_data.name, block_id, 0),
(target.name, block_id)
)
fold_score_keys.append((name, 'validation', validation_id))
validation_id += 1
else:
# Training block
model_id += 1
dsk[(name, 'model', model_id)] = (
_partial_fit,
(name, 'model', model_id - 1),
(input_data.name, block_id, 0),
(target.name, block_id),
kwargs
)
model_keys.append((name, 'model', model_id))
dsk.update(input_data.dask)
dsk.update(target.dask)
if compute:
return get(dsk, [model_keys, validation_score_keys])
return (dsk, [model_keys, validation_score_keys])
def test_cross_validation():
input_data, target = generate_data(
n_samples=100, n_features=30, blocksize=10)
results = cross_validate(Perceptron(), input_data, target,
classes=[0, 1], n_folds=5, compute=False)
return results
if __name__ == "__main__":
data_filename = 'data.hdf5'
if not os.path.exists(data_filename):
features, target = generate_data(n_samples=int(1e6),
n_features=int(1e2),
blocksize=int(1e4))
data_size = features.nbytes + target.nbytes
print("Generating %0.3fGB of training data in %s" %
(data_size / 1e9, data_filename))
with h5py.File(data_filename, 'w') as f:
dset_features = f.require_dataset(
'/data/features',
shape=features.shape,
chunks=tuple(c[0] for c in features.chunks),
dtype=features.dtype)
dset_target = f.require_dataset(
'/data/target',
shape=target.shape,
chunks=tuple(c[0] for c in target.chunks),
dtype=target.dtype)
da.store([features, target], [dset_features, dset_target])
# TODO: make it possible to do:
# da.store({'/data/features': features,
# '/data/target': target},
# 'hdf5://data.hdf5')
print("Using data from %s" % data_filename)
# TODO: make it possible do read an open the hdf5 lazily instead of having
# to open it globally before calling `da.from_array`
with h5py.File(data_filename, 'r+') as f:
features_ds = f['/data/features']
features = da.from_array(features_ds, chunks=features_ds.chunks)
target_ds = f['/data/target']
target = da.from_array(target_ds, chunks=target_ds.chunks)
n_folds = 10
print("Performing %d-fold cross validation of a perceptron model"
% n_folds)
with Profiler() as prof:
t0 = time()
models, validation_scores = cross_validate(
Perceptron(), features, target, classes=[0, 1],
n_folds=n_folds)
print("%0.3fs" % (time() - t0))
prof.visualize()
import os
from time import time
import h5py
import numpy as np
import dask.array as da
import dask.dataframe as dd
from dask import threaded
from dask.base import tokenize
from dask.context import _globals
from dask.diagnostics import Profiler
from dask.async import get_sync
from sklearn.linear_model import Perceptron
from sklearn.base import clone
import pandas as pd
import matplotlib.pyplot as plt
def generate_data(n_samples=int(1e6), n_features=int(1e3),
n_informative=0.1, noise_scale=0.01,
blocksize=int(1e3), seed=0):
"""Generates some random, almost linearly separable data."""
rng = np.random.RandomState(0)
if n_informative < 1:
n_informative = int(np.ceil(n_informative * n_features))
if blocksize > n_samples:
blocksize = n_samples
true_weights = rng.normal(size=n_features)
true_weights[n_informative:] = 0.0
true_intercept = rng.normal(scale=0.1)
n_blocks = n_samples // blocksize
block_seeds = rng.random_integers(0, 2 ** 32 - 1, n_blocks)
def make_input_data(block_idx):
rng = np.random.RandomState(block_seeds[block_idx])
return rng.normal(size=(blocksize, n_features))
def make_target(block_idx, input_block):
rng = np.random.RandomState(block_seeds[block_idx])
noise = rng.normal(scale=noise_scale, size=blocksize)
target_block = np.dot(input_block, true_weights.T) + noise
return (target_block + true_intercept > 0).astype(np.int32)
dsk_features, dsk_target = {}, {}
for i in range(n_blocks):
dsk_features[('features', i, 0)] = (make_input_data, i)
dsk_target[('target', i)] = (make_target, i, ('features', i, 0))
# input blocks are required to generate output blocks
dsk_target.update(dsk_features)
features = da.Array(dsk_features, 'features',
shape=(n_samples, n_features),
chunks=(blocksize, n_features))
target = da.Array(dsk_target, 'target',
shape=(n_samples,),
chunks=(blocksize,))
return features, target
def _partial_fit(model, data, target_column, kwargs=None):
kwargs = kwargs or dict()
x = data.drop(target_column, axis=1)
y = data[target_column]
model.partial_fit(x, y, **kwargs)
return model
def _compute_score(model, data, target_column):
x = data.drop(target_column, axis=1)
y = data[target_column]
return model.score(x, y)
def cross_validate(estimator, data, target_column, n_folds=5,
get=None, compute=True, **kwargs):
get = get or _globals['get'] or threaded.get
dsk = {}
model_keys = []
validation_score_keys = []
name_template = 'cross_validate_fold_%d' + tokenize(
estimator, data, target_column, n_folds)
for fold_id in range(n_folds):
model = clone(estimator)
name = name_template % fold_id
model_id = 0
validation_id = 0
dsk[(name, 'model', model_id)] = model
fold_score_keys = []
validation_score_keys.append(fold_score_keys)
for partition_id in range(data.npartitions):
if partition_id % n_folds == fold_id:
# Validation block
if partition_id > 0:
# We cannot compute scores on a fold for a model that was
# not fitted first, let's just skip for now and compute
# the validation scores starting from the next validation
# block
dsk[(name, 'validation', validation_id)] = (
_compute_score,
(name, 'model', model_id),
(data._name, partition_id),
target_column
)
fold_score_keys.append((name, 'validation', validation_id))
validation_id += 1
else:
# Training block
model_id += 1
dsk[(name, 'model', model_id)] = (
_partial_fit,
(name, 'model', model_id - 1),
(data._name, partition_id),
target_column,
kwargs
)
model_keys.append((name, 'model', model_id))
dsk.update(data.dask)
if compute:
return get(dsk, [model_keys, validation_score_keys])
return (dsk, [model_keys, validation_score_keys])
def test_cross_validation():
input_data, target = generate_data(
n_samples=100, n_features=30, blocksize=10)
results = cross_validate(Perceptron(), input_data, target,
classes=[0, 1], n_folds=5, compute=False)
return results
if __name__ == "__main__":
data_filename = 'data.hdf5'
if not os.path.exists(data_filename):
features, target = generate_data(n_samples=int(1e6),
n_features=int(1e2),
blocksize=int(1e4))
data_size = features.nbytes + target.nbytes
print("Generating %0.3fGB of training data in %s" %
(data_size / 1e9, data_filename))
with h5py.File(data_filename, 'w') as f:
dset_features = f.require_dataset(
'/data/features',
shape=features.shape,
chunks=tuple(c[0] for c in features.chunks),
dtype=features.dtype)
dset_target = f.require_dataset(
'/data/target',
shape=target.shape,
chunks=tuple(c[0] for c in target.chunks),
dtype=target.dtype)
da.store([features, target], [dset_features, dset_target])
# TODO: make it possible to do:
# da.store({'/data/features': features,
# '/data/target': target},
# 'hdf5://data.hdf5')
print("Using data from %s" % data_filename)
# TODO: make it possible do read an open the hdf5 lazily instead of having
# to open it globally before calling `da.from_array`
with h5py.File(data_filename, 'r+') as f:
features_ds = f['/data/features']
features = da.from_array(features_ds, chunks=features_ds.chunks)
target_ds = f['/data/target']
target = da.from_array(target_ds, chunks=target_ds.chunks)
data = dd.from_array(features)
data = data.assign(target=dd.from_array(target, columns=['target']))
# data = data.cache()
get = None # get_sync
n_folds = 10
print("Performing %d-fold cross validation of a perceptron model"
% n_folds)
t0 = time()
with Profiler() as prof:
t0 = time()
models, validation_scores = cross_validate(
Perceptron(), data, 'target', classes=[0, 1],
n_folds=n_folds, get=get)
print("%0.3fs" % (time() - t0))
prof.visualize()
for scores in validation_scores:
plt.plot(pd.ewma(np.asarray(scores), span=10))
plt.title('(Smoothed) validation scores for each fold')
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment