Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#
# ARGUMENT PARSING
#
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--n-samples', type=int, dest='n_samples', help='number of samples in the dataset', default=5950)
parser.add_argument('--n-workers', type=int, dest='n_workers', help='number of workers to launch', default=1)
parser.add_argument('--threads-per-worker', type=int, dest='threads_per_worker', help='threads per worker', default=4)
parser.add_argument('--dataset-size', type=str, dest='dataset_size', help='size of the dataset', default='')
parser.add_argument('--worker-memory-limit', type=str, dest='worker_memory_limit', help='worker memory limit', default='1 GB')
parser.add_argument('--chunk-size', type=int, dest='chunk_size', help='number of rows to a chunk', default=5950)
args = parser.parse_args()
# Used for testing.
# class TestArgs:
# def __init__(self):
# pass
# args = TestArgs()
# args.n_workers = 1
# args.threads_per_worker = 72
# args.dataset_size = '10 GB'
# args.worker_memory_limit = '120 GB'
# args.chunk_size = 5950000
if __name__ == "__main__":
# manually discovered n_samples values that generate arrays of approximately the given size
MAGIC_N_SAMPLES = {
'1 MB': 5950,
'10 MB': 59500,
'100 MB': 595000,
'1 GB': 5950000,
'10 GB': 59500000,
'100 GB': 595000000
}
# I don't know what the actually optimal number of works is, this is something you need to
# discover on your own as you scale. I suppose we'll figure this out as part of this process
if args.dataset_size != '':
args.n_samples = MAGIC_N_SAMPLES[args.dataset_size]
#
# CREATE CLUSTER AND DATASET
#
from dask.distributed import Client, LocalCluster
from dask_ml.datasets import make_regression
from dask_ml.wrappers import ParallelPostFit
cluster = LocalCluster(
n_workers=args.n_workers, threads_per_worker=args.threads_per_worker,
memory_limit=args.worker_memory_limit
)
client = Client(cluster)
X, y = make_regression(
n_samples=args.n_samples, n_features=20, n_informative=2, n_targets=1,
chunks=(args.chunk_size, 20)
)
#
# FIT AND PRINT HOW LONG IT TOOK
#
print(f"Fitting LinearRegression on {args.n_samples} samples with {args.n_workers} cluster nodes...")
from dask_ml.linear_model import LinearRegression
clf = LinearRegression()
import time
start_time = time.time()
clf.fit(X, y) # where the work happens
print(f"Fitting done in {str(time.time() - start_time)} seconds.")
import gc
gc.collect()
#
# PREDICT AND PRINT HOW LONG IT TOOK
#
print(f"Scoring LinearRegression...")
start_time = time.time()
clf = ParallelPostFit(estimator=clf)
clf.predict(X)
print(f"Scoring done in {str(time.time() - start_time)} seconds.")
#
# ARGUMENT PARSING
#
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--n-samples', type=int, dest='n_samples', help='number of samples in the dataset', default=5950)
parser.add_argument('--n-workers', type=int, dest='n_workers', help='number of workers to launch', default=1)
parser.add_argument('--threads-per-worker', type=int, dest='threads_per_worker', help='threads per worker', default=4)
parser.add_argument('--dataset-size', type=str, dest='dataset_size', help='size of the dataset', default='')
parser.add_argument('--worker-memory-limit', type=str, dest='worker_memory_limit', help='worker memory limit', default='1 GB')
parser.add_argument('--chunk-size', type=int, dest='chunk_size', help='number of rows to a chunk', default=5950)
args = parser.parse_args()
if __name__ == "__main__":
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
# Used for testing.
# class TestArgs:
# def __init__(self):
# pass
# args = TestArgs()
# args.n_workers = 1
# args.threads_per_worker = 1
# args.dataset_size = '10 GB'
# args.worker_memory_limit = '12 GB'
# args.chunk_size = 5950000
MAGIC_N_SAMPLES = {
'1 MB': 5950,
'10 MB': 59500,
'100 MB': 595000,
'1 GB': 5950000,
'10 GB': 59500000,
'100 GB': 595000000
}
if args.dataset_size != '':
args.n_samples = MAGIC_N_SAMPLES[args.dataset_size]
cluster = LocalCUDACluster()
client = Client(cluster)
from cuml.dask.datasets.regression import make_regression
from dask_ml.wrappers import ParallelPostFit
print(f"Calling make_regression(n_samples={args.n_samples}, n_parts={args.n_samples // args.chunk_size})")
X, y = make_regression(
n_samples=args.n_samples, n_features=20, n_informative=2, n_targets=1,
n_parts=args.n_samples // args.chunk_size
)
from cuml.dask.linear_model import LinearRegression
clf = LinearRegression()
import time
start_time = time.time()
print("Fitting LinearRegression...")
clf.fit(X, y)
print(f"Fitting done in {str(time.time() - start_time)} seconds.")
import gc
gc.collect()
#
# PREDICT AND PRINT HOW LONG IT TOOK
#
print(f"Scoring LinearRegression...")
start_time = time.time()
clf = ParallelPostFit(estimator=clf)
clf.predict(X)
print(f"Scoring done in {str(time.time() - start_time)} seconds.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.