Last active
October 9, 2020 22:14
-
-
Save ResidentMario/4c027de65d79fdf0b84afecb62119af6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python scripts/training/gpu/train_parallel_implementation.py \ | |
--n-workers 1 --threads-per-worker 1 \ | |
--dataset-size '10 GB' --worker-memory-limit '12 GB' \ | |
--chunk-size 5950000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# ARGUMENT PARSING | |
# | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--n-samples', type=int, dest='n_samples', help='number of samples in the dataset', default=5950) | |
parser.add_argument('--n-workers', type=int, dest='n_workers', help='number of workers to launch', default=1) | |
parser.add_argument('--threads-per-worker', type=int, dest='threads_per_worker', help='threads per worker', default=4) | |
parser.add_argument('--dataset-size', type=str, dest='dataset_size', help='size of the dataset', default='') | |
parser.add_argument('--worker-memory-limit', type=str, dest='worker_memory_limit', help='worker memory limit', default='1 GB') | |
parser.add_argument('--chunk-size', type=int, dest='chunk_size', help='number of rows to a chunk', default=5950) | |
args = parser.parse_args() | |
# Used for testing. | |
# class TestArgs: | |
# def __init__(self): | |
# pass | |
# args = TestArgs() | |
# args.n_workers = 1 | |
# args.threads_per_worker = 72 | |
# args.dataset_size = '10 GB' | |
# args.worker_memory_limit = '120 GB' | |
# args.chunk_size = 5950000 | |
if __name__ == "__main__": | |
# manually discovered n_samples values that generate arrays of approximately the given size | |
MAGIC_N_SAMPLES = { | |
'1 MB': 5950, | |
'10 MB': 59500, | |
'100 MB': 595000, | |
'1 GB': 5950000, | |
'10 GB': 59500000, | |
'100 GB': 595000000 | |
} | |
# I don't know what the actually optimal number of works is, this is something you need to | |
# discover on your own as you scale. I suppose we'll figure this out as part of this process | |
if args.dataset_size != '': | |
args.n_samples = MAGIC_N_SAMPLES[args.dataset_size] | |
# | |
# CREATE CLUSTER AND DATASET | |
# | |
from dask.distributed import Client, LocalCluster | |
from dask_ml.datasets import make_regression | |
from dask_ml.wrappers import ParallelPostFit | |
cluster = LocalCluster( | |
n_workers=args.n_workers, threads_per_worker=args.threads_per_worker, | |
memory_limit=args.worker_memory_limit | |
) | |
client = Client(cluster) | |
X, y = make_regression( | |
n_samples=args.n_samples, n_features=20, n_informative=2, n_targets=1, | |
chunks=(args.chunk_size, 20) | |
) | |
# | |
# FIT AND PRINT HOW LONG IT TOOK | |
# | |
print(f"Fitting LinearRegression on {args.n_samples} samples with {args.n_workers} cluster nodes...") | |
from dask_ml.linear_model import LinearRegression | |
clf = LinearRegression() | |
import time | |
start_time = time.time() | |
clf.fit(X, y) # where the work happens | |
print(f"Fitting done in {str(time.time() - start_time)} seconds.") | |
import gc | |
gc.collect() | |
# | |
# PREDICT AND PRINT HOW LONG IT TOOK | |
# | |
print(f"Scoring LinearRegression...") | |
start_time = time.time() | |
clf = ParallelPostFit(estimator=clf) | |
clf.predict(X) | |
print(f"Scoring done in {str(time.time() - start_time)} seconds.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment