Last active
May 17, 2017 09:13
-
-
Save pstephens/1b5d8f3b0b97b192ddd6a78389095ff1 to your computer and use it in GitHub Desktop.
A Spark doodle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.rdd.NewHadoopRDD | |
object Program { | |
def main(args: Array[String]): Unit = { | |
val srcFile = "hdfs://sandbox/Numbers.txt" | |
val conf = new SparkConf().setAppName("spark-numbers") | |
val sc = new SparkContext(conf) | |
// Get using the Hadoop API so that we can retrieve FileSplit information | |
val jobConf = new Configuration() | |
jobConf.set("mapreduce.input.fileInputFormat.split.minsize", "4") | |
val lines = sc | |
.newAPIHadoopFile(srcFile, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], jobConf) | |
.asInstanceOf[NewHadoopRDD[LongWritable, Text]] | |
// Get count of lines by starting offset | |
val splits = lines.mapPartitionsWithInputSplit( | |
(split, it) => | |
Iterator( | |
split match { | |
case fs : FileSplit => | |
(fs.getStart, it.count(_ => true)) | |
case _ => | |
(-1L, it.count(_ => true)) | |
})) | |
// Print the results | |
splits.foreach { case (loc, cnt) => println(s"Loc: ${loc} Count: ${cnt}") } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.rdd.NewHadoopRDD | |
object Program2 { | |
def main(args: Array[String]): Unit = { | |
val dstFile = "hdfs://sandbox/Numbers.txt" | |
val conf = new SparkConf().setAppName("spark-numbers") | |
val sc = new SparkContext(conf) | |
// Create {cnt} items partitioned into {parts} partitions | |
val cnt = 50000000L | |
val parts = 4 | |
val numbers = sc.makeRDD(Seq.range(0L, cnt - 1L), parts) | |
val hexNumbers = numbers.map(n => f"$n%08x") | |
hexNumbers.saveAsTextFile(dstFile) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment