Skip to content

Instantly share code, notes, and snippets.

@betatim

betatim/gpu.py Secret

Last active September 1, 2023 08:22
Show Gist options
  • Save betatim/712dc3bd19237e928c200e2ab954c0e3 to your computer and use it in GitHub Desktop.
Save betatim/712dc3bd19237e928c200e2ab954c0e3 to your computer and use it in GitHub Desktop.
import argparse
import os
import timeit
from pathlib import Path
import joblib
import pandas as pd
import dask_cudf
import dask.dataframe as dd
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
# logistic regression
from cuml.dask.linear_model import LogisticRegression
def load_data(path_customers, path_transactions):
customer_data = dask_cudf.read_csv(path_customers)
transaction_data = dask_cudf.read_csv(path_transactions)
customer_data["senderID"] = customer_data["fa_customer_sk"]
data = dd.merge(transaction_data, customer_data, on="senderID")
return data
def pre_process(data):
data_pre = data
data_pre["time"] = dd.to_datetime(data_pre["time"])
data_pre["business_hour"] = data_pre["time"].dt.hour
data_pre["amount_norm"] = data_pre["amount"] / data_pre["transaction_limit"]
data_pre["business_hour_norm"] = data_pre["business_hour"] / 23
# The `persist()` makes sure the results of the pre-processing
# aren't recomputed multiple times
if "isFraud" in data_pre.columns:
return data_pre[
["transactionID", "amount_norm", "business_hour_norm", "isFraud"]
].persist()
else:
return data_pre[
["transactionID", "amount_norm", "business_hour_norm"]
].persist()
def train(data):
# The default solver in dask_ml is 'admm', the benchmark
# explicitly selects lbfgs. This keeps results comparable
# between the different implementations.
lrn = LogisticRegression(C=1.0)
X_train = data[["business_hour_norm", "amount_norm"]]
# Without changing the type to float an exception is raised that tells you
# it was expecting the type to be float, not int.
y_train = data["isFraud"].astype(float)
return lrn.fit(X_train, y_train)
def serve(model, data):
data_serve = data[["business_hour_norm", "amount_norm"]]
data["isFraud"] = model.predict(data_serve)
# Make sure things are computed inside the `serve` function
# and not only when we save the CSV later
# This also outputs a pandas DF, instead of a Dask DF
# which is nice because otherwise the scoring machinery of the
# benchmark suit stumbles (writing a dask DF creates several files, not
# just one).
return data[["transactionID", "isFraud"]].compute()
def main():
model_file_name = "uc10.python.model"
parser = argparse.ArgumentParser()
parser.add_argument("--debug", action="store_true", required=False)
parser.add_argument(
"--stage", choices=["training", "serving"], metavar="stage", required=True
)
parser.add_argument("--workdir", metavar="workdir", required=True)
parser.add_argument("--output", metavar="output", required=False)
parser.add_argument("customers")
parser.add_argument("transactions")
args = parser.parse_args()
path_customers = args.customers
path_transactions = args.transactions
stage = args.stage
work_dir = Path(args.workdir)
if args.output:
output = Path(args.output)
else:
output = work_dir
if not os.path.exists(work_dir):
os.makedirs(work_dir)
if not os.path.exists(output):
os.makedirs(output)
start = timeit.default_timer()
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1")
client = Client(cluster)
(raw_data) = load_data(path_customers, path_transactions)
end = timeit.default_timer()
load_time = end - start
print("load time:\t", load_time)
start = timeit.default_timer()
preprocessed_data = pre_process(raw_data)
end = timeit.default_timer()
pre_process_time = end - start
print("pre-process time:\t", pre_process_time)
if stage == "training":
start = timeit.default_timer()
model = train(preprocessed_data)
end = timeit.default_timer()
train_time = end - start
print("train time:\t", train_time)
# Without this pickling/unpickling doesn't work
model = model.get_combined_model()
#joblib.dump(model, work_dir / model_file_name)
# turning this off for now so it can be run in one go,
# in reality the idea is that you can run training and
# serving phases independently by dumping the model
if True or stage == "serving":
#model = joblib.load(work_dir / model_file_name)
start = timeit.default_timer()
predictions = serve(model, preprocessed_data)
print(type(predictions))
end = timeit.default_timer()
serve_time = end - start
print("serve time:\t", serve_time)
predictions.to_csv(output / "predictions.csv", index=False)
if __name__ == "__main__":
main()
load time: 5.883073884993792
pre-process time: 0.03996838256716728
train time: 8.333389542996883
Traceback (most recent call last):
File "/home/nfs/thead/git/gpu-xb-ai/workload/python/workload/UseCase10/dask_gpu.py", line 140, in <module>
main()
File "/home/nfs/thead/git/gpu-xb-ai/workload/python/workload/UseCase10/dask_gpu.py", line 130, in main
predictions = serve(model, preprocessed_data)
File "/home/nfs/thead/git/gpu-xb-ai/workload/python/workload/UseCase10/dask_gpu.py", line 62, in serve
data["isFraud"] = model.predict(data_serve)
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/dask/dataframe/core.py", line 4915, in __setitem__
df = self.assign(**{key: value})
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/dask/dataframe/core.py", line 5361, in assign
df2 = data._meta_nonempty.assign(
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner
result = func(*args, **kwargs)
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 1375, in assign
new_df[name] = kwargs.pop(name)
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner
result = func(*args, **kwargs)
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 1250, in __setitem__
self.insert(len(self._data), arg, value)
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner
result = func(*args, **kwargs)
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 2893, in insert
return self._insert(
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/nvtx/nvtx.py", line 101, in inner
result = func(*args, **kwargs)
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/dataframe.py", line 2963, in _insert
value = value._align_to_index(
File "/nvme/0/thead/miniconda/envs/gpu-xb-py311/lib/python3.10/site-packages/cudf/core/indexed_frame.py", line 2455, in _align_to_index
raise ValueError("Cannot align indices with non-unique values")
ValueError: Cannot align indices with non-unique values
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment