Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
this is a simple for each batch spark streaming sink function
from pyspark.sql.functions import input_file_name,regexp_extract,col
def foreach_batch_function(df, epoch_id):
"""
df: this is the dataframe read from the files at the input source
epoch_id: this is the batch id. Numerical value indicating the trigger number of the stream
"""
df_with_file_name = df.withColumn("filename", regexp_extract(input_file_name(), "^(.*[\/])", 1))
df_with_file_name.cache()
all_paths = df_with_file_name.select("filename").distinct().collect()
for path in all_paths:
directory=path.filename
directory=directory.replace("dbfs:/dbfs/mnt/test_data", "dbfs:/dbfs/mnt/test_data_output")
df_with_file_name.where(col("filename")==path.filename).drop("filename").write.csv(directory, mode="overwrite")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.