-
-
Save betatim/712dc3bd19237e928c200e2ab954c0e3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import os | |
import timeit | |
from pathlib import Path | |
import joblib | |
import pandas as pd | |
import dask_cudf | |
import dask.dataframe as dd | |
from dask.distributed import Client | |
from dask_cuda import LocalCUDACluster | |
# logistic regression | |
from cuml.dask.linear_model import LogisticRegression | |
def load_data(path_customers, path_transactions): | |
customer_data = dask_cudf.read_csv(path_customers) | |
transaction_data = dask_cudf.read_csv(path_transactions) | |
customer_data["senderID"] = customer_data["fa_customer_sk"] | |
data = dd.merge(transaction_data, customer_data, on="senderID") | |
return data | |
def pre_process(data): | |
data_pre = data | |
data_pre["time"] = dd.to_datetime(data_pre["time"]) | |
data_pre["business_hour"] = data_pre["time"].dt.hour | |
data_pre["amount_norm"] = data_pre["amount"] / data_pre["transaction_limit"] | |
data_pre["business_hour_norm"] = data_pre["business_hour"] / 23 | |
# The `persist()` makes sure the results of the pre-processing | |
# aren't recomputed multiple times | |
if "isFraud" in data_pre.columns: | |
return data_pre[ | |
["transactionID", "amount_norm", "business_hour_norm", "isFraud"] | |
].persist() | |
else: | |
return data_pre[ | |
["transactionID", "amount_norm", "business_hour_norm"] | |
].persist() | |
def train(data): | |
# The default solver in dask_ml is 'admm', the benchmark | |
# explicitly selects lbfgs. This keeps results comparable | |
# between the different implementations. | |
lrn = LogisticRegression(C=1.0) | |
X_train = data[["business_hour_norm", "amount_norm"]] | |
# Without changing the type to float an exception is raised that tells you | |
# it was expecting the type to be float, not int. | |
y_train = data["isFraud"].astype(float) | |
return lrn.fit(X_train, y_train) | |
def serve(model, data): | |
data_serve = data[["business_hour_norm", "amount_norm"]] | |
data["isFraud"] = model.predict(data_serve) | |
# Make sure things are computed inside the `serve` function | |
# and not only when we save the CSV later | |
# This also outputs a pandas DF, instead of a Dask DF | |
# which is nice because otherwise the scoring machinery of the | |
# benchmark suit stumbles (writing a dask DF creates several files, not | |
# just one). | |
return data[["transactionID", "isFraud"]].compute() | |
def main(): | |
model_file_name = "uc10.python.model" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--debug", action="store_true", required=False) | |
parser.add_argument( | |
"--stage", choices=["training", "serving"], metavar="stage", required=True | |
) | |
parser.add_argument("--workdir", metavar="workdir", required=True) | |
parser.add_argument("--output", metavar="output", required=False) | |
parser.add_argument("customers") | |
parser.add_argument("transactions") | |
args = parser.parse_args() | |
path_customers = args.customers | |
path_transactions = args.transactions | |
stage = args.stage | |
work_dir = Path(args.workdir) | |
if args.output: | |
output = Path(args.output) | |
else: | |
output = work_dir | |
if not os.path.exists(work_dir): | |
os.makedirs(work_dir) | |
if not os.path.exists(output): | |
os.makedirs(output) | |
start = timeit.default_timer() | |
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1") | |
client = Client(cluster) | |
(raw_data) = load_data(path_customers, path_transactions) | |
end = timeit.default_timer() | |
load_time = end - start | |
print("load time:\t", load_time) | |
start = timeit.default_timer() | |
preprocessed_data = pre_process(raw_data) | |
end = timeit.default_timer() | |
pre_process_time = end - start | |
print("pre-process time:\t", pre_process_time) | |
if stage == "training": | |
start = timeit.default_timer() | |
model = train(preprocessed_data) | |
end = timeit.default_timer() | |
train_time = end - start | |
print("train time:\t", train_time) | |
# Without this pickling/unpickling doesn't work | |
model = model.get_combined_model() | |
#joblib.dump(model, work_dir / model_file_name) | |
# turning this off for now so it can be run in one go, | |
# in reality the idea is that you can run training and | |
# serving phases independently by dumping the model | |
if True or stage == "serving": | |
#model = joblib.load(work_dir / model_file_name) | |
start = timeit.default_timer() | |
predictions = serve(model, preprocessed_data) | |
print(type(predictions)) | |
end = timeit.default_timer() | |
serve_time = end - start | |
print("serve time:\t", serve_time) | |
predictions.to_csv(output / "predictions.csv", index=False) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
load time: 5.883073884993792 | |
pre-process time: 0.03996838256716728 | |
train time: 8.333389542996883 | |
Traceback (most recent call last): | |
File "/home/nfs/thead/git/gpu-xb-ai/workload/python/workload/UseCase10/dask_gpu.py", line 140, in <module> | |
main() | |
File "/home/nfs/thead/git/gpu-xb-ai/workload/python/workload/UseCase10/dask_gpu.py", line 130, in main | |
predictions = serve(model, preprocessed_data) | |
File "/home/nfs/thead/git/gpu-xb-ai/workload/python/workload/UseCase10/dask_gpu.py", line 62, in serve | |
data["isFraud"] = model.predict(data_serve) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/dask/dataframe/core.py", line 4915, in __setitem__ | |
df = self.assign(**{key: value}) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/dask/dataframe/core.py", line 5361, in assign | |
df2 = data._meta_nonempty.assign( | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner | |
result = func(*args, **kwargs) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 1375, in assign | |
new_df[name] = kwargs.pop(name) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner | |
result = func(*args, **kwargs) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 1250, in __setitem__ | |
self.insert(len(self._data), arg, value) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner | |
result = func(*args, **kwargs) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 2893, in insert | |
return self._insert( | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner | |
result = func(*args, **kwargs) | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 2963, in _insert | |
value = value._align_to_index( | |
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/indexed_frame.py", line 2455, in _align_to_index | |
raise ValueError("Cannot align indices with non-unique values") | |
ValueError: Cannot align indices with non-unique values |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment