Last active
August 29, 2021 15:07
-
-
Save bachwehbi/49e0035bdcf3d420a181415e02a189b7 to your computer and use it in GitHub Desktop.
Simple approach to accelerate writing to S3 from Spark.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import os | |
import time | |
""" | |
Writing from Spark to S3 is ridiculously slow. This is because S3 is an object | |
store and not a file system. Because of consistency model of S3, when writing | |
Parquet (or ORC) files from Spark. Data will be stored to a temporary destination | |
then renamed when the job is successful. As S3 is an object store, renaming files | |
is very expensive (complete rewrite). The Spark job will only terminate when all | |
files have been renamed. | |
An approach to avoid this waste of time is to write first to local HDFS on EMR, | |
then use Hadoop's distcp utility to copy data from HDFS to S3. | |
To evaluate this approach in isolation, we will read from S3 using S3A protocol, | |
write to HDFS, then copy from HDFS to S3 before cleaning up. | |
This approach can reduce the latency of writes by a 40-50%. | |
""" | |
ts1 = time.time() | |
# source folder (key) name on S3 | |
in_fname = 'input_path_to_big_file_on_s3' | |
# destination folder (key) name on S3 | |
out_fname = 'output_path_to_big_file_on_s3' | |
# Folder destination on local HDFS in base64 to avoid nested folder structure | |
out_fname_b64 = base64.b64encode(out_fname) | |
# assuming you already create your Spark context... | |
df = spark.read.parquet(in_fname) | |
df.write.format('parquet').save(out_fname_b64) | |
os.system('hadoop distcp {0} {1}'.format(out_fname_b64, out_fname)) | |
os.system('hdfs dfs -rm -r -skipTrash {0}'.format(out_fname_b64)) | |
ts2 = time.time() | |
print 'Operation took {0:d} seconds'.format(ts2 - ts1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Got this error trace using above:
I think may be because
distcp
needs absolute hdfs filepath. More here