Skip to content

Instantly share code, notes, and snippets.

@airhorns
Created May 28, 2014 19:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save airhorns/965118226592bddbf443 to your computer and use it in GitHub Desktop.
Save airhorns/965118226592bddbf443 to your computer and use it in GitHub Desktop.
HADOOP_CODECS = {
'default': "org.apache.hadoop.io.compress.DefaultCodec",
'zlib': "org.apache.hadoop.io.compress.DefaultCodec",
'gzip': "org.apache.hadoop.io.compress.GzipCodec",
'bzip2': "org.apache.hadoop.io.compress.BZip2Codec",
'lz4': "org.apache.hadoop.io.compress.Lz4Codec",
'snappy': "org.apache.hadoop.io.compress.SnappyCodec"
}
def saveAsTextFile(self, location, codec='gzip'):
"""
Saves this RDD writing to the file specified by location, with an optional compression codec to use.
:param location: where to save the records to
:param codec: what compression codec to use. Defaults to gzip.
"""
location = hdfs.expand_path(location)
if codec is None:
self.rdd.saveAsTextFile(location)
return
def func(split, iterator):
for x in iterator:
if not isinstance(x, basestring):
x = unicode(x)
yield x.encode("utf-8")
keyed = pyspark.rdd.PipelinedRDD(self.rdd, func)
keyed._bypass_serializer = True
codec_class_name = self.HADOOP_CODECS[codec]
codec_class = self.sc._jvm.java.lang.Class.forName(codec_class_name)
keyed._jrdd.map(self.sc._jvm.BytesToString()).saveAsTextFile(location, codec_class)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment