Skip to content

Instantly share code, notes, and snippets.

View priyanlc's full-sized avatar

Priyan priyanlc

  • google
View GitHub Profile
ACCESS_KEY = <>
SECRET_KEY = <>
AWS_BUCKET_NAME = "s3a://sample-datasets-for-blogs-pl"
MOUNT_NAME= "blogs_pl"
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", SECRET_KEY)
dbutils.fs.unmount(f"/mnt/{MOUNT_NAME}")
dbutils.fs.mount(AWS_BUCKET_NAME, f"/mnt/{MOUNT_NAME}")
display(dbutils.fs.ls(f"/mnt/{MOUNT_NAME}"))
df=spark.read.option("header","true").option("inferSchema","true) \
.csv(AWS_BUCKET_NAME +"/taxi_fare")
df.write.parquet(AWS_BUCKET_NAME +"/taxi_fare_parquet")
from pyspark.sql.functions import sqrt
from pyspark.sql.functions import hour, year, month, dayofmonth, dayofweek
from pyspark.sql.functions import udf, col
def clean(spark, df):
df = df.where((df["pickup_longitude"] >= -75) & (df["pickup_longitude"] <= -73)) \
.where((df["dropoff_longitude"] >= -75) & (df["dropoff_longitude"] <= -73)) \
.where((df["pickup_latitude"] >= 39) & (df["pickup_latitude"] <= 42)) \
.where((df["dropoff_latitude"] >= 39) & (df["dropoff_latitude"] <= 42))
# Remove possible outliers
sample_df = con_df.sample(0.05)
train_df, test_df, validate_df = sample_df.randomSplit([0.8, 0.1, 0.1], seed=12345)
train_df.write.mode('overwrite').parquet(AWS_BUCKET_NAME+"/taxi_fare_feature_eng_train_sample1")
validate_df.write.mode('overwrite').parquet(AWS_BUCKET_NAME+"/taxi_fare_feature_eng_validate_sample1")
test_df.write.mode('overwrite').parquet(AWS_BUCKET_NAME+"/taxi_fare_feature_eng_test_sample1")
%fs ls /databricks-datasets
dbutils.library.list()
display(dataset)
train_pd_df=read_parquet_folder_as_pandas('/dbfs/mnt/blogs_pl/taxi_fare_feature_eng_train_sample1')
validate_pd_df=read_parquet_folder_as_pandas('/dbfs/mnt/blogs_pl/taxi_fare_feature_eng_validate_sample1')
test_pd_df=read_parquet_folder_as_pandas('/dbfs/mnt/blogs_pl/taxi_fare_feature_eng_test_sample1')
train_labels = train_pd_df['fare_amount'].values
validation_labels = validate_pd_df['fare_amount'].values
train_pandas = train_pd_df.drop(['fare_amount','key'], axis=1)
validation_pandas = validate_pd_df.drop(['fare_amount','key'], axis=1)
# Create the deep learning layers
model = keras.models.Sequential()
model.add(keras.layers.Dense(256, activation='relu', input_shape=(train_df_scaled.shape[1],), name='raw'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(128, activation='relu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(64, activation='relu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(32, activation='relu'))
run_config = tf.estimator.RunConfig(model_dir=MODEL_DIR, save_summary_steps=4000, save_checkpoints_steps=4000)
estimator = keras.estimator.model_to_estimator(keras_model=model, config=run_config)
train_spec = tf.estimator.TrainSpec(input_fn=input_function(train_df_scaled, train_labels, True),
max_steps=STEPS)
eval_spec = tf.estimator.EvalSpec(input_fn=input_function(validation_df_scaled, validation_labels, True),
steps=1200, throttle_secs=350)
tf.estimator.train_and_evaluate(estimator, train_spec=train_spec, eval_spec=eval_spec)
prediction = estimator.predict(input_function(test_scaled))
prediction_df = pd.DataFrame(prediction)
save_output(test_pd_df, prediction_df, 'fare_amount', '/dbfs/mnt/blogs_pl/output1')
sample_df = con_df.sample(0.8)