Last active
April 25, 2020 23:51
-
-
Save sanjurm16/e71ff9fae4a94adf568456ec9ee5d769 to your computer and use it in GitHub Desktop.
this is a simple for each batch spark streaming sink function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import input_file_name,regexp_extract,col | |
def foreach_batch_function(df, epoch_id): | |
""" | |
df: this is the dataframe read from the files at the input source | |
epoch_id: this is the batch id. Numerical value indicating the trigger number of the stream | |
""" | |
df_with_file_name = df.withColumn("filename", regexp_extract(input_file_name(), "^(.*[\/])", 1)) | |
df_with_file_name.cache() | |
all_paths = df_with_file_name.select("filename").distinct().collect() | |
for path in all_paths: | |
directory=path.filename | |
directory=directory.replace("dbfs:/dbfs/mnt/test_data", "dbfs:/dbfs/mnt/test_data_output") | |
df_with_file_name.where(col("filename")==path.filename).drop("filename").write.csv(directory, mode="overwrite") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment