Skip to content

Instantly share code, notes, and snippets.

@pstephens
Last active May 17, 2017 09:13
Show Gist options
  • Save pstephens/1b5d8f3b0b97b192ddd6a78389095ff1 to your computer and use it in GitHub Desktop.
Save pstephens/1b5d8f3b0b97b192ddd6a78389095ff1 to your computer and use it in GitHub Desktop.
A Spark doodle
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.NewHadoopRDD
object Program {
def main(args: Array[String]): Unit = {
val srcFile = "hdfs://sandbox/Numbers.txt"
val conf = new SparkConf().setAppName("spark-numbers")
val sc = new SparkContext(conf)
// Get using the Hadoop API so that we can retrieve FileSplit information
val jobConf = new Configuration()
jobConf.set("mapreduce.input.fileInputFormat.split.minsize", "4")
val lines = sc
.newAPIHadoopFile(srcFile, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], jobConf)
.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
// Get count of lines by starting offset
val splits = lines.mapPartitionsWithInputSplit(
(split, it) =>
Iterator(
split match {
case fs : FileSplit =>
(fs.getStart, it.count(_ => true))
case _ =>
(-1L, it.count(_ => true))
}))
// Print the results
splits.foreach { case (loc, cnt) => println(s"Loc: ${loc} Count: ${cnt}") }
}
}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.NewHadoopRDD
object Program2 {
def main(args: Array[String]): Unit = {
val dstFile = "hdfs://sandbox/Numbers.txt"
val conf = new SparkConf().setAppName("spark-numbers")
val sc = new SparkContext(conf)
// Create {cnt} items partitioned into {parts} partitions
val cnt = 50000000L
val parts = 4
val numbers = sc.makeRDD(Seq.range(0L, cnt - 1L), parts)
val hexNumbers = numbers.map(n => f"$n%08x")
hexNumbers.saveAsTextFile(dstFile)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment