Last active
March 8, 2017 14:40
-
-
Save ogrisel/ef8cbc1e42cef66d4184 to your computer and use it in GitHub Desktop.
experiment with dask and sklearn
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from time import time | |
import h5py | |
import numpy as np | |
import dask.array as da | |
from sklearn.linear_model import Perceptron | |
def generate_data(n_samples=int(1e6), n_features=int(1e3), | |
n_informative=0.1, noise_scale=0.01, | |
blocksize=int(1e3), seed=0): | |
"""Generates some random, almost linearly separable data.""" | |
rng = np.random.RandomState(0) | |
if n_informative < 1: | |
n_informative = int(np.ceil(n_informative * n_features)) | |
if blocksize > n_samples: | |
blocksize = n_samples | |
true_weights = rng.normal(size=n_features) | |
true_weights[n_informative:] = 0.0 | |
true_intercept = rng.normal(scale=0.1) | |
n_blocks = n_samples // blocksize | |
block_seeds = rng.random_integers(0, 2 ** 32 - 1, n_blocks) | |
def make_input_data(block_idx): | |
rng = np.random.RandomState(block_seeds[block_idx]) | |
return rng.normal(size=(blocksize, n_features)) | |
def make_target(block_idx, input_block): | |
rng = np.random.RandomState(block_seeds[block_idx]) | |
noise = rng.normal(scale=noise_scale, size=blocksize) | |
target_block = np.dot(input_block, true_weights.T) + noise | |
return (target_block + true_intercept > 0).astype(np.int32) | |
dsk_features, dsk_target = {}, {} | |
for i in range(n_blocks): | |
dsk_features[('features', i, 0)] = (make_input_data, i) | |
dsk_target[('target', i)] = (make_target, i, ('features', i, 0)) | |
# input blocks are required to generate output blocks | |
dsk_target.update(dsk_features) | |
features = da.Array(dsk_features, 'features', | |
shape=(n_samples, n_features), | |
chunks=(blocksize, n_features)) | |
target = da.Array(dsk_target, 'target', | |
shape=(n_samples,), | |
chunks=(blocksize,)) | |
return features, target | |
if __name__ == "__main__": | |
data_filename = 'data.hdf5' | |
if not os.path.exists(data_filename): | |
features, target = generate_data(n_samples=int(1e6), | |
n_features=int(1e3)) | |
data_size = features.nbytes + target.nbytes | |
print("Generating %0.3fGB of training data in %s" % | |
(data_size / 1e9, data_filename)) | |
with h5py.File(data_filename, 'w') as f: | |
dset_features = f.require_dataset( | |
'/data/features', | |
shape=features.shape, | |
chunks=tuple(c[0] for c in features.chunks), | |
dtype=features.dtype) | |
dset_target = f.require_dataset( | |
'/data/target', | |
shape=target.shape, | |
chunks=tuple(c[0] for c in target.chunks), | |
dtype=target.dtype) | |
da.store([features, target], [dset_features, dset_target]) | |
# TODO: make it possible to do: | |
# da.store({'/data/features': features, | |
# '/data/target': target}, | |
# 'hdf5://data.hdf5') | |
print("Using data from %s" % data_filename) | |
# TODO: make it possible do read an open the hdf5 lazily instead of having | |
# to open it globally before calling `da.from_array` | |
with h5py.File(data_filename, 'r+') as f: | |
features_ds = f['/data/features'] | |
features = da.from_array(features_ds, chunks=features_ds.chunks) | |
target_ds = f['/data/target'] | |
target = da.from_array(target_ds, chunks=target_ds.chunks) | |
print("Fitting a perceptron model") | |
t0 = time() | |
model = da.learn.fit(Perceptron(), features, target, classes=[0, 1]) | |
print("%0.3fs" % (time() - t0)) | |
print("Computing predictions") | |
t0 = time() | |
predictions = da.learn.predict(model, features) | |
dset_target = f.require_dataset( | |
'/data/predictions', | |
shape=predictions.shape, | |
chunks=tuple(c[0] for c in predictions.chunks), | |
dtype=predictions.dtype) | |
da.store(predictions, dset_target) | |
print("%0.3fs" % (time() - t0)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from time import time | |
import h5py | |
import numpy as np | |
import dask.array as da | |
from dask import threaded | |
from dask.base import tokenize | |
from dask.context import _globals | |
from dask.diagnostics import Profiler | |
from sklearn.linear_model import Perceptron | |
from sklearn.base import clone | |
def generate_data(n_samples=int(1e6), n_features=int(1e3), | |
n_informative=0.1, noise_scale=0.01, | |
blocksize=int(1e3), seed=0): | |
"""Generates some random, almost linearly separable data.""" | |
rng = np.random.RandomState(0) | |
if n_informative < 1: | |
n_informative = int(np.ceil(n_informative * n_features)) | |
if blocksize > n_samples: | |
blocksize = n_samples | |
true_weights = rng.normal(size=n_features) | |
true_weights[n_informative:] = 0.0 | |
true_intercept = rng.normal(scale=0.1) | |
n_blocks = n_samples // blocksize | |
block_seeds = rng.random_integers(0, 2 ** 32 - 1, n_blocks) | |
def make_input_data(block_idx): | |
rng = np.random.RandomState(block_seeds[block_idx]) | |
return rng.normal(size=(blocksize, n_features)) | |
def make_target(block_idx, input_block): | |
rng = np.random.RandomState(block_seeds[block_idx]) | |
noise = rng.normal(scale=noise_scale, size=blocksize) | |
target_block = np.dot(input_block, true_weights.T) + noise | |
return (target_block + true_intercept > 0).astype(np.int32) | |
dsk_features, dsk_target = {}, {} | |
for i in range(n_blocks): | |
dsk_features[('features', i, 0)] = (make_input_data, i) | |
dsk_target[('target', i)] = (make_target, i, ('features', i, 0)) | |
# input blocks are required to generate output blocks | |
dsk_target.update(dsk_features) | |
features = da.Array(dsk_features, 'features', | |
shape=(n_samples, n_features), | |
chunks=(blocksize, n_features)) | |
target = da.Array(dsk_target, 'target', | |
shape=(n_samples,), | |
chunks=(blocksize,)) | |
return features, target | |
def _partial_fit(model, x, y, kwargs=None): | |
kwargs = kwargs or dict() | |
model.partial_fit(x, y, **kwargs) | |
return model | |
def _compute_score(model, x, y): | |
return model.score(x, y) | |
def cross_validate(estimator, input_data, target, n_folds=5, | |
get=None, compute=True, **kwargs): | |
get = get or _globals['get'] or threaded.get | |
dsk = {} | |
n_blocks = len(input_data.chunks[0]) | |
if len(input_data.chunks[1]) > 1: | |
input_data = input_data.reblock( | |
chunks=(n_blocks, sum(input_data.chunks[1]))) | |
model_keys = [] | |
validation_score_keys = [] | |
name_template = 'cross_validate_fold_%d' + tokenize( | |
estimator, input_data, target, n_folds) | |
for fold_id in range(n_folds): | |
model = clone(estimator) | |
name = name_template % fold_id | |
model_id = 0 | |
validation_id = 0 | |
dsk[(name, 'model', model_id)] = model | |
fold_score_keys = [] | |
validation_score_keys.append(fold_score_keys) | |
for block_id in range(n_blocks): | |
if block_id % n_folds == fold_id: | |
# Validation block | |
if block_id > 0: | |
# We cannot compute scores on a fold for a model that was | |
# not fitted first, let's just skip for now and compute | |
# the validation scores starting from the next validation | |
# block | |
dsk[(name, 'validation', validation_id)] = ( | |
_compute_score, | |
(name, 'model', model_id), | |
(input_data.name, block_id, 0), | |
(target.name, block_id) | |
) | |
fold_score_keys.append((name, 'validation', validation_id)) | |
validation_id += 1 | |
else: | |
# Training block | |
model_id += 1 | |
dsk[(name, 'model', model_id)] = ( | |
_partial_fit, | |
(name, 'model', model_id - 1), | |
(input_data.name, block_id, 0), | |
(target.name, block_id), | |
kwargs | |
) | |
model_keys.append((name, 'model', model_id)) | |
dsk.update(input_data.dask) | |
dsk.update(target.dask) | |
if compute: | |
return get(dsk, [model_keys, validation_score_keys]) | |
return (dsk, [model_keys, validation_score_keys]) | |
def test_cross_validation(): | |
input_data, target = generate_data( | |
n_samples=100, n_features=30, blocksize=10) | |
results = cross_validate(Perceptron(), input_data, target, | |
classes=[0, 1], n_folds=5, compute=False) | |
return results | |
if __name__ == "__main__": | |
data_filename = 'data.hdf5' | |
if not os.path.exists(data_filename): | |
features, target = generate_data(n_samples=int(1e6), | |
n_features=int(1e2), | |
blocksize=int(1e4)) | |
data_size = features.nbytes + target.nbytes | |
print("Generating %0.3fGB of training data in %s" % | |
(data_size / 1e9, data_filename)) | |
with h5py.File(data_filename, 'w') as f: | |
dset_features = f.require_dataset( | |
'/data/features', | |
shape=features.shape, | |
chunks=tuple(c[0] for c in features.chunks), | |
dtype=features.dtype) | |
dset_target = f.require_dataset( | |
'/data/target', | |
shape=target.shape, | |
chunks=tuple(c[0] for c in target.chunks), | |
dtype=target.dtype) | |
da.store([features, target], [dset_features, dset_target]) | |
# TODO: make it possible to do: | |
# da.store({'/data/features': features, | |
# '/data/target': target}, | |
# 'hdf5://data.hdf5') | |
print("Using data from %s" % data_filename) | |
# TODO: make it possible do read an open the hdf5 lazily instead of having | |
# to open it globally before calling `da.from_array` | |
with h5py.File(data_filename, 'r+') as f: | |
features_ds = f['/data/features'] | |
features = da.from_array(features_ds, chunks=features_ds.chunks) | |
target_ds = f['/data/target'] | |
target = da.from_array(target_ds, chunks=target_ds.chunks) | |
n_folds = 10 | |
print("Performing %d-fold cross validation of a perceptron model" | |
% n_folds) | |
with Profiler() as prof: | |
t0 = time() | |
models, validation_scores = cross_validate( | |
Perceptron(), features, target, classes=[0, 1], | |
n_folds=n_folds) | |
print("%0.3fs" % (time() - t0)) | |
prof.visualize() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from time import time | |
import h5py | |
import numpy as np | |
import dask.array as da | |
import dask.dataframe as dd | |
from dask import threaded | |
from dask.base import tokenize | |
from dask.context import _globals | |
from dask.diagnostics import Profiler | |
from dask.async import get_sync | |
from sklearn.linear_model import Perceptron | |
from sklearn.base import clone | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
def generate_data(n_samples=int(1e6), n_features=int(1e3), | |
n_informative=0.1, noise_scale=0.01, | |
blocksize=int(1e3), seed=0): | |
"""Generates some random, almost linearly separable data.""" | |
rng = np.random.RandomState(0) | |
if n_informative < 1: | |
n_informative = int(np.ceil(n_informative * n_features)) | |
if blocksize > n_samples: | |
blocksize = n_samples | |
true_weights = rng.normal(size=n_features) | |
true_weights[n_informative:] = 0.0 | |
true_intercept = rng.normal(scale=0.1) | |
n_blocks = n_samples // blocksize | |
block_seeds = rng.random_integers(0, 2 ** 32 - 1, n_blocks) | |
def make_input_data(block_idx): | |
rng = np.random.RandomState(block_seeds[block_idx]) | |
return rng.normal(size=(blocksize, n_features)) | |
def make_target(block_idx, input_block): | |
rng = np.random.RandomState(block_seeds[block_idx]) | |
noise = rng.normal(scale=noise_scale, size=blocksize) | |
target_block = np.dot(input_block, true_weights.T) + noise | |
return (target_block + true_intercept > 0).astype(np.int32) | |
dsk_features, dsk_target = {}, {} | |
for i in range(n_blocks): | |
dsk_features[('features', i, 0)] = (make_input_data, i) | |
dsk_target[('target', i)] = (make_target, i, ('features', i, 0)) | |
# input blocks are required to generate output blocks | |
dsk_target.update(dsk_features) | |
features = da.Array(dsk_features, 'features', | |
shape=(n_samples, n_features), | |
chunks=(blocksize, n_features)) | |
target = da.Array(dsk_target, 'target', | |
shape=(n_samples,), | |
chunks=(blocksize,)) | |
return features, target | |
def _partial_fit(model, data, target_column, kwargs=None): | |
kwargs = kwargs or dict() | |
x = data.drop(target_column, axis=1) | |
y = data[target_column] | |
model.partial_fit(x, y, **kwargs) | |
return model | |
def _compute_score(model, data, target_column): | |
x = data.drop(target_column, axis=1) | |
y = data[target_column] | |
return model.score(x, y) | |
def cross_validate(estimator, data, target_column, n_folds=5, | |
get=None, compute=True, **kwargs): | |
get = get or _globals['get'] or threaded.get | |
dsk = {} | |
model_keys = [] | |
validation_score_keys = [] | |
name_template = 'cross_validate_fold_%d' + tokenize( | |
estimator, data, target_column, n_folds) | |
for fold_id in range(n_folds): | |
model = clone(estimator) | |
name = name_template % fold_id | |
model_id = 0 | |
validation_id = 0 | |
dsk[(name, 'model', model_id)] = model | |
fold_score_keys = [] | |
validation_score_keys.append(fold_score_keys) | |
for partition_id in range(data.npartitions): | |
if partition_id % n_folds == fold_id: | |
# Validation block | |
if partition_id > 0: | |
# We cannot compute scores on a fold for a model that was | |
# not fitted first, let's just skip for now and compute | |
# the validation scores starting from the next validation | |
# block | |
dsk[(name, 'validation', validation_id)] = ( | |
_compute_score, | |
(name, 'model', model_id), | |
(data._name, partition_id), | |
target_column | |
) | |
fold_score_keys.append((name, 'validation', validation_id)) | |
validation_id += 1 | |
else: | |
# Training block | |
model_id += 1 | |
dsk[(name, 'model', model_id)] = ( | |
_partial_fit, | |
(name, 'model', model_id - 1), | |
(data._name, partition_id), | |
target_column, | |
kwargs | |
) | |
model_keys.append((name, 'model', model_id)) | |
dsk.update(data.dask) | |
if compute: | |
return get(dsk, [model_keys, validation_score_keys]) | |
return (dsk, [model_keys, validation_score_keys]) | |
def test_cross_validation(): | |
input_data, target = generate_data( | |
n_samples=100, n_features=30, blocksize=10) | |
results = cross_validate(Perceptron(), input_data, target, | |
classes=[0, 1], n_folds=5, compute=False) | |
return results | |
if __name__ == "__main__": | |
data_filename = 'data.hdf5' | |
if not os.path.exists(data_filename): | |
features, target = generate_data(n_samples=int(1e6), | |
n_features=int(1e2), | |
blocksize=int(1e4)) | |
data_size = features.nbytes + target.nbytes | |
print("Generating %0.3fGB of training data in %s" % | |
(data_size / 1e9, data_filename)) | |
with h5py.File(data_filename, 'w') as f: | |
dset_features = f.require_dataset( | |
'/data/features', | |
shape=features.shape, | |
chunks=tuple(c[0] for c in features.chunks), | |
dtype=features.dtype) | |
dset_target = f.require_dataset( | |
'/data/target', | |
shape=target.shape, | |
chunks=tuple(c[0] for c in target.chunks), | |
dtype=target.dtype) | |
da.store([features, target], [dset_features, dset_target]) | |
# TODO: make it possible to do: | |
# da.store({'/data/features': features, | |
# '/data/target': target}, | |
# 'hdf5://data.hdf5') | |
print("Using data from %s" % data_filename) | |
# TODO: make it possible do read an open the hdf5 lazily instead of having | |
# to open it globally before calling `da.from_array` | |
with h5py.File(data_filename, 'r+') as f: | |
features_ds = f['/data/features'] | |
features = da.from_array(features_ds, chunks=features_ds.chunks) | |
target_ds = f['/data/target'] | |
target = da.from_array(target_ds, chunks=target_ds.chunks) | |
data = dd.from_array(features) | |
data = data.assign(target=dd.from_array(target, columns=['target'])) | |
# data = data.cache() | |
get = None # get_sync | |
n_folds = 10 | |
print("Performing %d-fold cross validation of a perceptron model" | |
% n_folds) | |
t0 = time() | |
with Profiler() as prof: | |
t0 = time() | |
models, validation_scores = cross_validate( | |
Perceptron(), data, 'target', classes=[0, 1], | |
n_folds=n_folds, get=get) | |
print("%0.3fs" % (time() - t0)) | |
prof.visualize() | |
for scores in validation_scores: | |
plt.plot(pd.ewma(np.asarray(scores), span=10)) | |
plt.title('(Smoothed) validation scores for each fold') | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment