This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Task | Spark | RAPIDS | |
---|---|---|---|
Load/rowcount | 20.6 seconds | 25.5 seconds | |
Feature engineering | 54.3 seconds | 23.1 seconds | |
Random forest | 36.9 minutes | 1.02 seconds |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from cuml.dask.ensemble import RandomForestRegressor | |
rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42) | |
_ = rf.fit(X, y) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.ml.regression import RandomForestRegressor | |
rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42) | |
fitted = rf.fit(X) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dask import persist | |
from dask.distributed import wait | |
taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday | |
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour | |
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute | |
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour | |
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float) | |
taxi = taxi.fillna(-1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.ml.feature import VectorAssembler | |
from pyspark.ml.pipeline import Pipeline | |
taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType())) | |
taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType())) | |
taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType())) | |
taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType())) | |
taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0)) | |
taxi = taxi.withColumn('label', taxi.total_amount) | |
taxi = taxi.fillna(-1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
features = ['pickup_weekday', 'pickup_hour', 'pickup_minute', | |
'pickup_week_hour', 'passenger_count', 'VendorID', | |
'RatecodeID', 'store_and_fwd_flag', 'PULocationID', | |
'DOLocationID'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dask_cudf | |
taxi = dask_cudf.read_csv(files, | |
assume_missing=True, | |
parse_dates=[1,2], | |
usecols=cols, | |
storage_options={'anon': True}) | |
len(taxi) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from pyspark.sql.types import * | |
import pyspark.sql.functions as F | |
from pyspark.sql import DataFrame | |
# manually specify schema because inferSchema in read.csv is quite slow | |
schema = StructType([ | |
StructField('VendorID', DoubleType()), | |
StructField('tpep_pickup_datetime', TimestampType()), | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import s3fs | |
fs = s3fs.S3FileSystem(anon=True) | |
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/') | |
if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)] | |
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', | |
'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', | |
'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dask.distributed import Client | |
from dask_saturn import SaturnCluster | |
cluster = SaturnCluster() | |
client = Client(cluster) |