Skip to content

Instantly share code, notes, and snippets.

View sanjurm16's full-sized avatar

Sanjay M sanjurm16

View GitHub Profile
@sanjurm16
sanjurm16 / foreachBatch_function
Last active April 25, 2020 23:51
this is a simple for each batch spark streaming sink function
from pyspark.sql.functions import input_file_name,regexp_extract,col
def foreach_batch_function(df, epoch_id):
"""
df: this is the dataframe read from the files at the input source
epoch_id: this is the batch id. Numerical value indicating the trigger number of the stream
"""
df_with_file_name = df.withColumn("filename", regexp_extract(input_file_name(), "^(.*[\/])", 1))
df_with_file_name.cache()
all_paths = df_with_file_name.select("filename").distinct().collect()
@sanjurm16
sanjurm16 / csv_streaming_dataframe
Last active April 24, 2020 22:19
This captures how one can create a streaming dataframe for atomically written csv files
from pyspark.sql.types import *
file_schema=StructType([StructField("record_str", StringType())])
file_stream_df = spark.readStream.option("sep", "\n")\
.option("header", "false").schema(file_schema)\
.csv("dbfs:/dbfs/mnt/test_data/streaming_db/streaming_data/*")
indexer = [StringIndexer(inputCol=column_name, outputCol=column_name+"Index", handleInvalid="keep") for column_name in column_to_index]
assembler = VectorAssembler(inputCols=["PclassIndex","SexIndex","Age","Fare","WithFamily","EmbarkedIndex"], outputCol="features")
labelindexer = StringIndexer(inputCol="Survived", outputCol="label")
sql_transformer = SQLTransformer(statement="SELECT features, label FROM __THIS__")
indexer.extend([assembler, labelindexer, sql_transformer])
new_feature = train_df.withColumn("WithFamily", when(train_df.SibSp + train_df.Parch > 0, 1).otherwise(0))
train_df = new_feature.drop('SibSp','Parch')
from pyspark.sql.functions import isnan, when, count, col
train_df.select([count(when(isnan(c), c)).alias(c) for c in train_df.columns]).show()
train_df.where("cabin is null").count()/train_df.count() *100
#77% of values is missing for cabin column. So ignoring this column for the model
wo_cabin_train_df = train_df.drop("cabin")
train_df.select("age").describe().show()
train_df.where("age is null").count()
#177 values out of the 714 values are null.
#replacing the null values with the mean age value
train_avg_age_df = train_df.na.fill({'age': 29})
train_df = spark.read.csv("/FileStore/tables/train.csv", header=True)