This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import input_file_name,regexp_extract,col | |
def foreach_batch_function(df, epoch_id): | |
""" | |
df: this is the dataframe read from the files at the input source | |
epoch_id: this is the batch id. Numerical value indicating the trigger number of the stream | |
""" | |
df_with_file_name = df.withColumn("filename", regexp_extract(input_file_name(), "^(.*[\/])", 1)) | |
df_with_file_name.cache() | |
all_paths = df_with_file_name.select("filename").distinct().collect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.types import * | |
file_schema=StructType([StructField("record_str", StringType())]) | |
file_stream_df = spark.readStream.option("sep", "\n")\ | |
.option("header", "false").schema(file_schema)\ | |
.csv("dbfs:/dbfs/mnt/test_data/streaming_db/streaming_data/*") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
indexer = [StringIndexer(inputCol=column_name, outputCol=column_name+"Index", handleInvalid="keep") for column_name in column_to_index] | |
assembler = VectorAssembler(inputCols=["PclassIndex","SexIndex","Age","Fare","WithFamily","EmbarkedIndex"], outputCol="features") | |
labelindexer = StringIndexer(inputCol="Survived", outputCol="label") | |
sql_transformer = SQLTransformer(statement="SELECT features, label FROM __THIS__") | |
indexer.extend([assembler, labelindexer, sql_transformer]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
new_feature = train_df.withColumn("WithFamily", when(train_df.SibSp + train_df.Parch > 0, 1).otherwise(0)) | |
train_df = new_feature.drop('SibSp','Parch') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import isnan, when, count, col | |
train_df.select([count(when(isnan(c), c)).alias(c) for c in train_df.columns]).show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
train_df.where("cabin is null").count()/train_df.count() *100 | |
#77% of values is missing for cabin column. So ignoring this column for the model | |
wo_cabin_train_df = train_df.drop("cabin") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
train_df.select("age").describe().show() | |
train_df.where("age is null").count() | |
#177 values out of the 714 values are null. | |
#replacing the null values with the mean age value | |
train_avg_age_df = train_df.na.fill({'age': 29}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
train_df = spark.read.csv("/FileStore/tables/train.csv", header=True) |