Skip to content

Instantly share code, notes, and snippets.

@bachwehbi
Last active August 29, 2021 15:07
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save bachwehbi/49e0035bdcf3d420a181415e02a189b7 to your computer and use it in GitHub Desktop.
Save bachwehbi/49e0035bdcf3d420a181415e02a189b7 to your computer and use it in GitHub Desktop.
Simple approach to accelerate writing to S3 from Spark.
import base64
import os
import time
"""
Writing from Spark to S3 is ridiculously slow. This is because S3 is an object
store and not a file system. Because of consistency model of S3, when writing
Parquet (or ORC) files from Spark. Data will be stored to a temporary destination
then renamed when the job is successful. As S3 is an object store, renaming files
is very expensive (complete rewrite). The Spark job will only terminate when all
files have been renamed.
An approach to avoid this waste of time is to write first to local HDFS on EMR,
then use Hadoop's distcp utility to copy data from HDFS to S3.
To evaluate this approach in isolation, we will read from S3 using S3A protocol,
write to HDFS, then copy from HDFS to S3 before cleaning up.
This approach can reduce the latency of writes by a 40-50%.
"""
ts1 = time.time()
# source folder (key) name on S3
in_fname = 'input_path_to_big_file_on_s3'
# destination folder (key) name on S3
out_fname = 'output_path_to_big_file_on_s3'
# Folder destination on local HDFS in base64 to avoid nested folder structure
out_fname_b64 = base64.b64encode(out_fname)
# assuming you already create your Spark context...
df = spark.read.parquet(in_fname)
df.write.format('parquet').save(out_fname_b64)
os.system('hadoop distcp {0} {1}'.format(out_fname_b64, out_fname))
os.system('hdfs dfs -rm -r -skipTrash {0}'.format(out_fname_b64))
ts2 = time.time()
print 'Operation took {0:d} seconds'.format(ts2 - ts1)
@yardstick17
Copy link

Got this error trace using above:

9/02/18 11:22:39 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, overwrite=false, skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, mapBandwidth=100, sslConfigurationFile='null', copyStrategy='uniformsize', preserveStatus=[], preserveRawXattrs=false, atomicWorkPath=null, logPath=null, sourceFileListing=null, sourcePaths=[czNhOi8vcHJ0LWVtci9kaXNjb3VudC1zdHJhdGVneS1ldmFsdWF0b3Ivb3V0cHV0L29yYWNsZS1zb2xkLWl0ZW1zL3NjaGVtYT1wcnQxX3Byb2R1Y3Rpb25fMjAxOV8wMl8xNF8xMF8zNg==], targetPath=s3a://a/valid/s3/path, targetPathExists=false, filtersFile='null'}
19/02/18 11:22:39 INFO client.RMProxy: Connecting to ResourceManager at ip-x-y-z-a.de-mid-1.compute.internal/z.y.x.t:8032
19/02/18 11:22:40 ERROR tools.DistCp: Invalid input: 
org.apache.hadoop.tools.CopyListing$InvalidInputException: czNhOi8vcHJ0LWVtci9kaXNjb3VudC1zdHJhdGVneS1ldmFsdWF0b3Ivb3V0cHV0L29yYWNsZS1zb2xkLWl0ZW1zL3NjaGVtYT1wcnQxX3Byb2R1Y3Rpb25fMjAxOV8wMl8xNF8xMF8zNg== doesn't exist
	at org.apache.hadoop.tools.GlobbedCopyListing.doBuildListing(GlobbedCopyListing.java:84)
	at org.apache.hadoop.tools.CopyListing.buildListing(CopyListing.java:86)
	at org.apache.hadoop.tools.DistCp.createInputFileListing(DistCp.java:398)
	at org.apache.hadoop.tools.DistCp.createAndSubmitJob(DistCp.java:190)
	at org.apache.hadoop.tools.DistCp.execute(DistCp.java:155)
	at org.apache.hadoop.tools.DistCp.run(DistCp.java:128)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
	at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)

I think may be because distcp needs absolute hdfs filepath. More here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment