This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyspark.sql.functions as F | |
import pyspark.sql.types as T | |
taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(T.DoubleType())) | |
taxi = taxi.withColumn('pickup_weekofyear', F.weekofyear(taxi.tpep_pickup_datetime).cast(T.DoubleType())) | |
taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(T.DoubleType())) | |
taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(T.DoubleType())) | |
taxi = taxi.withColumn('pickup_year_seconds', | |
(F.unix_timestamp(taxi.tpep_pickup_datetime) - | |
F.unix_timestamp(F.lit(datetime.datetime(2019, 1, 1, 0, 0, 0)))).cast(T.DoubleType())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.ml.regression import LinearRegression | |
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder | |
from pyspark.ml.evaluation import RegressionEvaluator | |
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler | |
from pyspark.ml.pipeline import Pipeline | |
indexers = [ | |
StringIndexer( | |
inputCol=c, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fitted = crossval.fit(taxi) | |
print(np.min(results.avgMetrics)) # min because metric is RMSE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
System | Runtime | LOC changed | |
---|---|---|---|
Python (single-node) | 3 hours | - | |
Dask | 11 minutes | 10 | |
Spark | 47 minutes | 100 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import findspark | |
findspark.init() | |
from pyspark.sql import SparkSession | |
spark = (SparkSession | |
.builder | |
.config('spark.executor.memory', '36g') | |
.getOrCreate()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dask.distributed import Client | |
from dask_saturn import SaturnCluster | |
cluster = SaturnCluster() | |
client = Client(cluster) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import s3fs | |
fs = s3fs.S3FileSystem(anon=True) | |
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/') | |
if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)] | |
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', | |
'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', | |
'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from pyspark.sql.types import * | |
import pyspark.sql.functions as F | |
from pyspark.sql import DataFrame | |
# manually specify schema because inferSchema in read.csv is quite slow | |
schema = StructType([ | |
StructField('VendorID', DoubleType()), | |
StructField('tpep_pickup_datetime', TimestampType()), | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dask_cudf | |
taxi = dask_cudf.read_csv(files, | |
assume_missing=True, | |
parse_dates=[1,2], | |
usecols=cols, | |
storage_options={'anon': True}) | |
len(taxi) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
features = ['pickup_weekday', 'pickup_hour', 'pickup_minute', | |
'pickup_week_hour', 'passenger_count', 'VendorID', | |
'RatecodeID', 'store_and_fwd_flag', 'PULocationID', | |
'DOLocationID'] |