Skip to content

Instantly share code, notes, and snippets.

@rikturr
rikturr / rf_spark_rapids.csv
Created July 30, 2020 20:20
rf spark vs rapids
Task Spark RAPIDS
Load/rowcount 20.6 seconds 25.5 seconds
Feature engineering 54.3 seconds 23.1 seconds
Random forest 36.9 minutes 1.02 seconds
@rikturr
rikturr / rapids_rf.py
Created July 30, 2020 20:19
rapids_rf
from cuml.dask.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)
_ = rf.fit(X, y)
@rikturr
rikturr / spark_rf.py
Created July 30, 2020 20:19
spark_rf
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)
fitted = rf.fit(X)
@rikturr
rikturr / dask_features.py
Created July 30, 2020 20:13
dask_features
from dask import persist
from dask.distributed import wait
taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)
taxi = taxi.fillna(-1)
@rikturr
rikturr / spark_features.py
Created July 30, 2020 20:11
spark_features
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline
taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))
taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))
taxi = taxi.withColumn('label', taxi.total_amount)
taxi = taxi.fillna(-1)
@rikturr
rikturr / features.py
Created July 30, 2020 20:10
features
features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',
'pickup_week_hour', 'passenger_count', 'VendorID',
'RatecodeID', 'store_and_fwd_flag', 'PULocationID',
'DOLocationID']
@rikturr
rikturr / dask_csv.py
Created July 30, 2020 20:05
dask_csv
import dask_cudf
taxi = dask_cudf.read_csv(files,
assume_missing=True,
parse_dates=[1,2],
usecols=cols,
storage_options={'anon': True})
len(taxi)
@rikturr
rikturr / spark_csv.py
Created July 30, 2020 20:04
spark_csv
import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
# manually specify schema because inferSchema in read.csv is quite slow
schema = StructType([
StructField('VendorID', DoubleType()),
StructField('tpep_pickup_datetime', TimestampType()),
...
@rikturr
rikturr / list_files.py
Created July 30, 2020 20:04
list_files
import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/')
if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance',
'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount',
'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']
@rikturr
rikturr / init_dask.py
Created July 30, 2020 20:03
init_dask
from dask.distributed import Client
from dask_saturn import SaturnCluster
cluster = SaturnCluster()
client = Client(cluster)