Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save darkjh/f657750a186da014642e to your computer and use it in GitHub Desktop.
Save darkjh/f657750a186da014642e to your computer and use it in GitHub Desktop.
//Adapted from: https://github.com/jcrobak/avro-examples
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
/*
* Using the GenericRecord API like AvroStorage
*/
object WordCountJobAvroGenericSpark {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Avro Generic Spark Scala",
System.getenv().get("SPARK_HOME"), List("target/scala-2.10/avro-spark_2.10-1.0.jar"))
val avroRdd = sc.newAPIHadoopFile("twitter.avro",
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val genericRecords = avroRdd.map{case (ak, _) => ak.datum()}
val wordCounts = genericRecords.map((gr: GenericRecord) => gr.get("tweet").asInstanceOf[String])
.flatMap{tweet: String => tweet.split(" ")}
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
val wordCountsFormatted = wordCounts.map{case (word, count) => (escapeCsv(word), count)}
.map{case (word, count) => s"$word,$count"}
wordCountsFormatted.saveAsTextFile("output/twitter-wordcount-scala-spark-generic.tsv")
}
}
------------------------------------------------------------------------------------------------------
PLANNED API of Avro-Scala-Macro-Annotations (WIP)
/*
* Using case classes instead of IDL classes to get the benefits of the SpecificRecord API without the hastle of IDL
*/
@AvroRecord
case class Twitter_Schema(username: String, tweet: String, timestamp: Long)
/* Or define the fields automatically from the schema on-board in the file
* @AvroTypeProvider("twitter.avro")
* @AvroRecord
* case class Twitter_Schema()
*/
object WordCountJobAvroSpecificSpark {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Avro Specific Spark Scala",
System.getenv().get("SPARK_HOME"), List("target/scala-2.10/avro-spark_2.10-1.0.jar"))
val avroRdd = sc.newAPIHadoopFile("twitter.avro",
classOf[AvroKeyInputFormat[Twitter_Schema]],
classOf[AvroKey[Twitter_Schema]],
classOf[NullWritable])
val specificRecords = avroRdd.map{case (ak, _) => ak.datum()}
val wordCounts = specificRecords.map((sr: twitter_schema) => sr.tweet.asInstanceOf[String])
.flatMap{tweet: String => tweet.split(" ")}
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
val wordCountsFormatted = wordCounts.map{case (word, count) => (escapeCsv(word), count)}
.map{case (word, count) => s"$word,$count"}
wordCountsFormatted.saveAsTextFile("output/twitter-wordcount-scala-spark-specific.tsv")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment