-
-
Save hello009-commits/e818319829d034169830edd0b8bc27ea to your computer and use it in GitHub Desktop.
A Spark doodle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.rdd.NewHadoopRDD | |
object Program { | |
def main(args: Array[String]): Unit = { | |
val srcFile = "hdfs://sandbox/Numbers.txt" | |
val conf = new SparkConf().setAppName("spark-numbers") | |
val sc = new SparkContext(conf) | |
// Get using the Hadoop API so that we can retrieve FileSplit information | |
val jobConf = new Configuration() | |
jobConf.set("mapreduce.input.fileInputFormat.split.minsize", "4") | |
val lines = sc | |
.newAPIHadoopFile(srcFile, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], jobConf) | |
.asInstanceOf[NewHadoopRDD[LongWritable, Text]] | |
// Get count of lines by starting offset | |
val splits = lines.mapPartitionsWithInputSplit( | |
(split, it) => | |
Iterator( | |
split match { | |
case fs : FileSplit => | |
(fs.getStart, it.count(_ => true)) | |
case _ => | |
(-1L, it.count(_ => true)) | |
})) | |
// Print the results | |
splits.foreach { case (loc, cnt) => println(s"Loc: ${loc} Count: ${cnt}") } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.rdd.NewHadoopRDD | |
object Program2 { | |
def main(args: Array[String]): Unit = { | |
val dstFile = "hdfs://sandbox/Numbers.txt" | |
val conf = new SparkConf().setAppName("spark-numbers") | |
val sc = new SparkContext(conf) | |
// Create {cnt} items partitioned into {parts} partitions | |
val cnt = 50000000L | |
val parts = 4 | |
val numbers = sc.makeRDD(Seq.range(0L, cnt - 1L), parts) | |
val hexNumbers = numbers.map(n => f"$n%08x") | |
hexNumbers.saveAsTextFile(dstFile) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment