Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mannharleen/b1f2e60457cb2b08a2f14db40b7ffa0f to your computer and use it in GitHub Desktop.
Save mannharleen/b1f2e60457cb2b08a2f14db40b7ffa0f to your computer and use it in GitHub Desktop.
Text file, json, csv, sequence, parquet, ORC, Avro, newHadoopAPI
/*
Assume that the following "rdd" exists
val rdd = sc.parallelize(Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10)))
type of rdd -> org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1]
rdd.collect -> Array[(Int, Int)] = Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10))
*/
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.hadoop.io._
import com.databricks.spark.avro._
// libraryDependencies += "com.databricks" %% "spark-avro" % "3.2.0"
val conf = new SparkConf().setAppName("zbc").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sc.setLogLevel("ERROR")
import sqlContext.implicits._
val rdd = sc.parallelize(Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10)))
/*-------------------------------------------------------------------------------------*/
// Textfile/JSON/CSV
/*-------------------------------------------------------------------------------------*/
//read json
sqlContext.read.json("file:///C:\\testfolder\\json_gzip").collect
//write json (any text file) with codec (works)
rdd.toDF.toJSON.saveAsTextFile("file:///C:\\testfolder\\json_gzip",classOf[org.apache.hadoop.io.compress.GzipCodec])
//read json; convert and save as csv
val rdd1 = sqlContext.read.json("file:///C:\\testfolder\\json_gzip").rdd
rdd1.map(x=> x.toString).map(x=> x.replace("[","")).map(x=>x.replace("]","")).saveAsTextFile("file:///C:\\testfolder\\json2csv")
/*-------------------------------------------------------------------------------------*/
// Sequence
/*-------------------------------------------------------------------------------------*/
//read seq file
sc.sequenceFile("file:///C:\\testfolder\\seq",classOf[IntWritable],classOf[IntWritable]).map(x=> (x._1.get,x._2.get))
//write seq file with codec (works)
rdd.saveAsSequenceFile("file:///C:\\testfolder\\seq_deflate",Some(classOf[org.apache.hadoop.io.compress.DeflateCodec]))
/*-------------------------------------------------------------------------------------*/
// Parquet
/*-------------------------------------------------------------------------------------*/
//write parquet with codec
sqlContext.setConf("spark.sql.avro.compression.codec","deflate")
rdd.toDF("col1","col2").write.parquet("file:///c:\\testfolder\\parquet_gzip")
//read parquet
sqlContext.read.parquet("file:///c:\\testfolder\\parquet_gzip")
/*-------------------------------------------------------------------------------------*/
// ORC
/*-------------------------------------------------------------------------------------*/
//write orc
val hiveContext = new org.apache.spark.sql.hive.HiveContext(conf)
hiveContext.createDataFrame(rdd.toDF.rdd,rdd.toDF.schema).write.orc("file:///C:\\testfolder\\orc")
//I cannot use toDF directly because it will create in dataframe in spark native context
//further, since first input for createDataframe is a RDD[Row], I am using ".toDF.rdd"
//Improvement - to convert RDD[String] to RDD[Row], I can:- rdd.map(x=> Row(x))
//write orc with compression -------alert---COMPRESSION not working-------------
//following should be done before creating sc
conf.set("orc.compress","gzip")
//create sc
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.createDataFrame(rdd.toDF.rdd,rdd.toDF.schema).write.format("orc").save("file:///C:\\testfolder\\orc1")
/*-------------------------------------------------------------------------------------*/
// Avro
/*-------------------------------------------------------------------------------------*/
//write avro
//must do before: import com.databricks.spark.avro._
sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
rdd.toDF("col1","col2").write.avro("file:///C:\\testfolder\\avro_snappy")
//*IMPO* if you get an error saying avro dataframe writer not found, make sure you import com.databricks.spark.avro._ before initializing sc & sqlContext
//read avro
sqlContext.read.avro("file:///C:\\testfolder\\avro_snappy").collect
/*-------------------------------------------------------------------------------------*/
// newHadoopAPI
/*-------------------------------------------------------------------------------------*/
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output._
val rddWritable = rdd.map(x=> (new IntWritable(x._1), new Text(x._2)))
//Write using newApiHadoopFile to location
rddWritable.saveAsNewAPIHadoopFile("location",classOf[IntWritable],classOf[Text],classOf[TextOutputFormat[IntWritable,Text]])
//Read using newApiHadoopFile from location
sc.newAPIHadoopFile("location", classOf[KeyValueTextInputFormat],classOf[Text],classOf[Text]).map(x=>(x._1.toString.toInt, x._2.toString)).take(3)
sc.newAPIHadoopFile("location", classOf[FileInputFormat[IntWritable,Text]],classOf[IntWritable],classOf[Text]).map(x=>(x._1.get, x._2.toString)).take(3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment