Skip to content

Instantly share code, notes, and snippets.

@thvasilo
Created March 23, 2015 11:09
Show Gist options
  • Save thvasilo/cd99709eacb44c8a8cff to your computer and use it in GitHub Desktop.
Save thvasilo/cd99709eacb44c8a8cff to your computer and use it in GitHub Desktop.
Simple job to ensure LZO compressed Google Ngrams data can be read
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.util.Random
import java.io._
import java.util.Properties
import org.apache.hadoop.fs._;
import org.apache.hadoop.conf._;
import org.apache.hadoop.io._;
import org.apache.hadoop.mapred._;
import org.apache.hadoop.util._;
import com.hadoop.mapreduce.LzoTextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.SequenceFileInputFormat
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class Registrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[LongWritable])
kryo.register(classOf[Text])
}
}
object TestNgrams {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("Google Ngrams Test")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", classOf[Registrator].getName)
.set("spark.kryoserializer.buffer.mb", "128")
// .set("spark.shuffle.consolidateFiles", "true")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.compress", "true")
val sc = new SparkContext(conf)
// This does not work:
// val lines = sc.hadoopFile[LongWritable, Text, SequenceFileInputFormat[LongWritable, Text]](
// "s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data", minPartitions = 4)
// .map(_._2.toString)
// Neither does this:
// val lines = sc
// .newAPIHadoopFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data",
// classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[LongWritable, Text]],//classOf[com.hadoop.mapreduce.LzoTextInputFormat],
// classOf[org.apache.hadoop.io.LongWritable],
// classOf[org.apache.hadoop.io.Text])
// .map(_._2.toString)
// But this does:
val lines: RDD[String] = sc.sequenceFile(
"s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data",
classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text], minPartitions = 4) //Min-partitions should depend on the number or cores in your cluster
.map(_._2.toString)
val theCount = lines.count
val theFirst = lines.first
println("")
println(s"THE COUNT WAS: ${theCount}")
println(s"FIRST LINE: ${theFirst}")
println("")
sc.stop
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment