Skip to content

Instantly share code, notes, and snippets.

@sanjurm16
Last active April 25, 2020 23:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sanjurm16/e71ff9fae4a94adf568456ec9ee5d769 to your computer and use it in GitHub Desktop.
Save sanjurm16/e71ff9fae4a94adf568456ec9ee5d769 to your computer and use it in GitHub Desktop.
this is a simple for each batch spark streaming sink function
from pyspark.sql.functions import input_file_name,regexp_extract,col
def foreach_batch_function(df, epoch_id):
"""
df: this is the dataframe read from the files at the input source
epoch_id: this is the batch id. Numerical value indicating the trigger number of the stream
"""
df_with_file_name = df.withColumn("filename", regexp_extract(input_file_name(), "^(.*[\/])", 1))
df_with_file_name.cache()
all_paths = df_with_file_name.select("filename").distinct().collect()
for path in all_paths:
directory=path.filename
directory=directory.replace("dbfs:/dbfs/mnt/test_data", "dbfs:/dbfs/mnt/test_data_output")
df_with_file_name.where(col("filename")==path.filename).drop("filename").write.csv(directory, mode="overwrite")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment