Skip to content

Instantly share code, notes, and snippets.

@rikturr
rikturr / init_spark.py
Created July 30, 2020 20:02
init_spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.config('spark.executor.memory', '36g')
.getOrCreate())
@rikturr
rikturr / hyperparameter_runtimes.csv
Last active December 8, 2020 19:37
hyperparameter runtimes
System Runtime LOC changed
Python (single-node) 3 hours -
Dask 11 minutes 10
Spark 47 minutes 100
@rikturr
rikturr / spark_run_grid.py
Created July 21, 2020 14:38
spark run grid
fitted = crossval.fit(taxi)
print(np.min(results.avgMetrics)) # min because metric is RMSE
@rikturr
rikturr / spark_grid_search.py
Created July 21, 2020 14:37
spark grid search
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
indexers = [
StringIndexer(
inputCol=c,
@rikturr
rikturr / spark_features.py
Created July 21, 2020 14:37
spark features
import pyspark.sql.functions as F
import pyspark.sql.types as T
taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_weekofyear', F.weekofyear(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_year_seconds',
(F.unix_timestamp(taxi.tpep_pickup_datetime) -
F.unix_timestamp(F.lit(datetime.datetime(2019, 1, 1, 0, 0, 0)))).cast(T.DoubleType()))
@rikturr
rikturr / init_spark.py
Created July 21, 2020 14:36
init spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
taxi = spark.read.csv('s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv',
header=True,
inferSchema=True,
timestampFormat='yyyy-MM-dd HH:mm:ss',
).sample(fraction=0.1, withReplacement=False)
@rikturr
rikturr / dask_grid_search.py
Last active December 8, 2020 19:35
dask grid search
from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, DummyEncoder, Categorizer
from dask_ml.model_selection import GridSearchCV
# Dask has slightly different way of one-hot encoding
pipeline = Pipeline(steps=[
('categorize', Categorizer(columns=categorical_feat)),
('onehot', DummyEncoder(columns=categorical_feat)),
('scale', ColumnTransformer(
transformers=[('num', StandardScaler(), numeric_feat)],
@rikturr
rikturr / load_dask.py
Created July 21, 2020 14:34
load dask
import dask.dataframe as dd
taxi = dd.read_csv(
's3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv',
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
).sample(frac=0.1, replace=False)
@rikturr
rikturr / init_dask.py
Created July 21, 2020 14:34
init dask
from dask.distributed import Client
from dask_saturn import SaturnCluster
cluster = SaturnCluster(n_workers=20)
client = Client(cluster)
@rikturr
rikturr / run_grid.py
Created July 21, 2020 14:33
run grid search!
grid_search.fit(taxi[features], taxi[y_col])
print(grid_search.best_score_)