Skip to content

Instantly share code, notes, and snippets.

@akki
Last active March 28, 2024 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akki/c2c817c71ae04ee654bb97b26595fb7b to your computer and use it in GitHub Desktop.
Save akki/c2c817c71ae04ee654bb97b26595fb7b to your computer and use it in GitHub Desktop.
Spark app with magic committer
# These extra-configs enable the magic committer.
# A _SUCCESS file with data is generated if these extra configs are passed. The file would otherwise be zero-byte in size, indicating that magic commiter did NOT get enabled.
extra_configs = {
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
"spark.sql.parquet.output.committer.class": "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter",
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
"spark.hadoop.fs.s3a.committer.name": "magic",
"spark.hadoop.fs.s3a.committer.magic.enabled": "true",
}
with create_spark_session(
override_configs = extra_configs,
common_configs={
"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs": "true",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.instances": 30,
},
) as session:
log.info(f"Running on Spark3 - query:\n{query}")
data = session.sql("<some-SELECT-sql-statement>")
data = data.coalesce(30)
log.info(f"Saving datapoints to {file_path_spark3}")
# Write the data
t = time.time()
data.write.parquet(
file_path_spark3, mode="overwrite"
)
print(f"Took {time.time() - t}s")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment