Skip to content

Instantly share code, notes, and snippets.

@satendrakumar
Last active May 10, 2017 07:49
Show Gist options
  • Save satendrakumar/53ab514bb7350bac007dcc1c4b7996da to your computer and use it in GitHub Desktop.
Save satendrakumar/53ab514bb7350bac007dcc1c4b7996da to your computer and use it in GitHub Desktop.
Remove headers from csv files without comparing each record using Spark
import java.io.{BufferedReader, File, InputStreamReader}
import javax.annotation.concurrent.NotThreadSafe
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkConf, SparkContext}
@NotThreadSafe
class WrappedIterator(fileName: String, firstLine: String, itr: Iterator[(LongWritable, Text)]) extends Iterator[(String, String)] {
private var isFirstIteration = true
override def hasNext: Boolean =
if (isFirstIteration)
true
else
itr.hasNext
override def next(): (String, String) =
if (isFirstIteration) {
isFirstIteration = false
(fileName, firstLine)
} else {
val (_, line) = itr.next()
(fileName, line.toString)
}
}
object SparkApp extends App {
val dirName = "data"
val conf = new SparkConf().setMaster("local").setAppName("SparkApp")
val sc = new SparkContext(conf)
val fileNameAndHeaders = sc.binaryFiles(dirName, 1).map {
case (fileName, stream) =>
val reader = new BufferedReader(new InputStreamReader(stream.open()))
val header = reader.readLine()
reader.close()
(new File(fileName).getName, header)
}.collect().toMap
val fileNameHeaderBr: Broadcast[Map[String, String]] = sc.broadcast(fileNameAndHeaders)
val hadoopRDD = sc.hadoopFile("data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
.asInstanceOf[HadoopRDD[LongWritable, Text]]
val withoutHeadersRDD = hadoopRDD.mapPartitionsWithInputSplit { (inputSplit, itr) =>
val file = inputSplit.asInstanceOf[FileSplit].getPath.getName
val (_, firstLine) = itr.next()
if (firstLine.toString == fileNameHeaderBr.value(file)) {
itr.map { case (_, line) => (file, line.toString) } // add filename to each record
} else {
new WrappedIterator(file, firstLine.toString, itr)
}
}
withoutHeadersRDD.collect().foreach(println)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment