Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
trait StreamExtractor[T] {
def extract: DStream[T]
}
class ProxyStreamExtractor(val ssc: StreamingContext) extends StreamExtractor[String] {
override def extract: DStream[String] = {
// Receiver-less approach: Kafka brokers instead of Zookeeper
val kafkaParams = Map("metadata.broker.list" -> "somehost:9092,anotherhost:9092")
val topics = Set("web.proxy")
// Poll for new Kafka messagees
val kafkaStream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
// Return a new DStream comprising new lines from the proxy log
kafkaStream.map{ ... }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.