Skip to content

Instantly share code, notes, and snippets.

@rikturr
rikturr / spark_features.py
Created July 21, 2020 14:37
spark features
import pyspark.sql.functions as F
import pyspark.sql.types as T
taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_weekofyear', F.weekofyear(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(T.DoubleType()))
taxi = taxi.withColumn('pickup_year_seconds',
(F.unix_timestamp(taxi.tpep_pickup_datetime) -
F.unix_timestamp(F.lit(datetime.datetime(2019, 1, 1, 0, 0, 0)))).cast(T.DoubleType()))
@rikturr
rikturr / spark_grid_search.py
Created July 21, 2020 14:37
spark grid search
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
indexers = [
StringIndexer(
inputCol=c,
@rikturr
rikturr / spark_run_grid.py
Created July 21, 2020 14:38
spark run grid
fitted = crossval.fit(taxi)
print(np.min(results.avgMetrics)) # min because metric is RMSE
@rikturr
rikturr / hyperparameter_runtimes.csv
Last active December 8, 2020 19:37
hyperparameter runtimes
System Runtime LOC changed
Python (single-node) 3 hours -
Dask 11 minutes 10
Spark 47 minutes 100
@rikturr
rikturr / init_spark.py
Created July 30, 2020 20:02
init_spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.config('spark.executor.memory', '36g')
.getOrCreate())
@rikturr
rikturr / init_dask.py
Created July 30, 2020 20:03
init_dask
from dask.distributed import Client
from dask_saturn import SaturnCluster
cluster = SaturnCluster()
client = Client(cluster)
@rikturr
rikturr / list_files.py
Created July 30, 2020 20:04
list_files
import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/')
if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance',
'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount',
'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']
@rikturr
rikturr / spark_csv.py
Created July 30, 2020 20:04
spark_csv
import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
# manually specify schema because inferSchema in read.csv is quite slow
schema = StructType([
StructField('VendorID', DoubleType()),
StructField('tpep_pickup_datetime', TimestampType()),
...
@rikturr
rikturr / dask_csv.py
Created July 30, 2020 20:05
dask_csv
import dask_cudf
taxi = dask_cudf.read_csv(files,
assume_missing=True,
parse_dates=[1,2],
usecols=cols,
storage_options={'anon': True})
len(taxi)
@rikturr
rikturr / features.py
Created July 30, 2020 20:10
features
features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',
'pickup_week_hour', 'passenger_count', 'VendorID',
'RatecodeID', 'store_and_fwd_flag', 'PULocationID',
'DOLocationID']