Created
September 9, 2017 14:59
-
-
Save mannharleen/b1f2e60457cb2b08a2f14db40b7ffa0f to your computer and use it in GitHub Desktop.
Text file, json, csv, sequence, parquet, ORC, Avro, newHadoopAPI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Assume that the following "rdd" exists | |
val rdd = sc.parallelize(Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10))) | |
type of rdd -> org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] | |
rdd.collect -> Array[(Int, Int)] = Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10)) | |
*/ | |
import org.apache.spark.{SparkConf,SparkContext} | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.sql.functions._ | |
import org.apache.hadoop.io._ | |
import com.databricks.spark.avro._ | |
// libraryDependencies += "com.databricks" %% "spark-avro" % "3.2.0" | |
val conf = new SparkConf().setAppName("zbc").setMaster("local[2]") | |
val sc = new SparkContext(conf) | |
val sqlContext = new SQLContext(sc) | |
sc.setLogLevel("ERROR") | |
import sqlContext.implicits._ | |
val rdd = sc.parallelize(Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10))) | |
/*-------------------------------------------------------------------------------------*/ | |
// Textfile/JSON/CSV | |
/*-------------------------------------------------------------------------------------*/ | |
//read json | |
sqlContext.read.json("file:///C:\\testfolder\\json_gzip").collect | |
//write json (any text file) with codec (works) | |
rdd.toDF.toJSON.saveAsTextFile("file:///C:\\testfolder\\json_gzip",classOf[org.apache.hadoop.io.compress.GzipCodec]) | |
//read json; convert and save as csv | |
val rdd1 = sqlContext.read.json("file:///C:\\testfolder\\json_gzip").rdd | |
rdd1.map(x=> x.toString).map(x=> x.replace("[","")).map(x=>x.replace("]","")).saveAsTextFile("file:///C:\\testfolder\\json2csv") | |
/*-------------------------------------------------------------------------------------*/ | |
// Sequence | |
/*-------------------------------------------------------------------------------------*/ | |
//read seq file | |
sc.sequenceFile("file:///C:\\testfolder\\seq",classOf[IntWritable],classOf[IntWritable]).map(x=> (x._1.get,x._2.get)) | |
//write seq file with codec (works) | |
rdd.saveAsSequenceFile("file:///C:\\testfolder\\seq_deflate",Some(classOf[org.apache.hadoop.io.compress.DeflateCodec])) | |
/*-------------------------------------------------------------------------------------*/ | |
// Parquet | |
/*-------------------------------------------------------------------------------------*/ | |
//write parquet with codec | |
sqlContext.setConf("spark.sql.avro.compression.codec","deflate") | |
rdd.toDF("col1","col2").write.parquet("file:///c:\\testfolder\\parquet_gzip") | |
//read parquet | |
sqlContext.read.parquet("file:///c:\\testfolder\\parquet_gzip") | |
/*-------------------------------------------------------------------------------------*/ | |
// ORC | |
/*-------------------------------------------------------------------------------------*/ | |
//write orc | |
val hiveContext = new org.apache.spark.sql.hive.HiveContext(conf) | |
hiveContext.createDataFrame(rdd.toDF.rdd,rdd.toDF.schema).write.orc("file:///C:\\testfolder\\orc") | |
//I cannot use toDF directly because it will create in dataframe in spark native context | |
//further, since first input for createDataframe is a RDD[Row], I am using ".toDF.rdd" | |
//Improvement - to convert RDD[String] to RDD[Row], I can:- rdd.map(x=> Row(x)) | |
//write orc with compression -------alert---COMPRESSION not working------------- | |
//following should be done before creating sc | |
conf.set("orc.compress","gzip") | |
//create sc | |
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) | |
hiveContext.createDataFrame(rdd.toDF.rdd,rdd.toDF.schema).write.format("orc").save("file:///C:\\testfolder\\orc1") | |
/*-------------------------------------------------------------------------------------*/ | |
// Avro | |
/*-------------------------------------------------------------------------------------*/ | |
//write avro | |
//must do before: import com.databricks.spark.avro._ | |
sqlContext.setConf("spark.sql.avro.compression.codec","snappy") | |
rdd.toDF("col1","col2").write.avro("file:///C:\\testfolder\\avro_snappy") | |
//*IMPO* if you get an error saying avro dataframe writer not found, make sure you import com.databricks.spark.avro._ before initializing sc & sqlContext | |
//read avro | |
sqlContext.read.avro("file:///C:\\testfolder\\avro_snappy").collect | |
/*-------------------------------------------------------------------------------------*/ | |
// newHadoopAPI | |
/*-------------------------------------------------------------------------------------*/ | |
import org.apache.hadoop.io._ | |
import org.apache.hadoop.mapreduce.lib.output._ | |
val rddWritable = rdd.map(x=> (new IntWritable(x._1), new Text(x._2))) | |
//Write using newApiHadoopFile to location | |
rddWritable.saveAsNewAPIHadoopFile("location",classOf[IntWritable],classOf[Text],classOf[TextOutputFormat[IntWritable,Text]]) | |
//Read using newApiHadoopFile from location | |
sc.newAPIHadoopFile("location", classOf[KeyValueTextInputFormat],classOf[Text],classOf[Text]).map(x=>(x._1.toString.toInt, x._2.toString)).take(3) | |
sc.newAPIHadoopFile("location", classOf[FileInputFormat[IntWritable,Text]],classOf[IntWritable],classOf[Text]).map(x=>(x._1.get, x._2.toString)).take(3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment