Skip to content

Instantly share code, notes, and snippets.

@zoltanctoth
Created February 6, 2017 20:09
Show Gist options
  • Save zoltanctoth/6d48ca9a80a60e4abc836be5bc0c5fc2 to your computer and use it in GitHub Desktop.
Save zoltanctoth/6d48ca9a80a60e4abc836be5bc0c5fc2 to your computer and use it in GitHub Desktop.
How to use the Direct Kafka Source in Scala
object Anomymizer extends App {
val spark = SparkSession.builder
.master("local[3]")
.appName("Anonimizer")
.getOrCreate()
val salt = "SAALT"
def anonimizeStr(a:Any) = {
a match {
case i:Int => i * 273372 % 1000
case i:Long => i * 273372 % 1000
case i:Double => i * 273372 % 1000
case i:Float => i * 273372 % 1000
case _ => {
val s = a.toString
val md5 = MessageDigest.getInstance("MD5").digest((s+salt).getBytes)
md5.mkString.substring(0,6)
}
}
}
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
/*
val lines = ssc.socketTextStream("localhost", 9999)
lines.foreachRDD(rdd => println(rdd.collect.mkString(" || ")))
lines.foreachRDD{ rdd =>
spark.read.json(rdd).createOrReplaceTempView("t")
}
ssc.remember(Seconds(60))
*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"key.serializer" -> classOf[StringSerializer].getName,
"value.serializer" -> classOf[StringSerializer].getName,
"group.id" -> "example",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val offsetRanges = Array(
OffsetRange("some-topic", 0, 110, 220))
val p = new TopicPartition("test", 0)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams, Map(p -> 0l))
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment