Skip to content

Instantly share code, notes, and snippets.

@RadoBuransky
Created January 4, 2016 07:09
Show Gist options
  • Save RadoBuransky/dc20d72593ff732e74f7 to your computer and use it in GitHub Desktop.
Save RadoBuransky/dc20d72593ff732e74f7 to your computer and use it in GitHub Desktop.
The idea here is to first create a state stream containing strings and then try to access this state as integers which should crash. But the thing is that the sesond test doesn't "see" state from the first test. Why?
package com.buransky
import _root_.kafka.serializer.StringDecoder
import net.manub.embeddedkafka.EmbeddedKafka
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.scalatest.FunSuite
/**
* The idea here is to first create a state stream containing strings and then try to access this state as integers
* which should crash. But the thing is that the sesond test doesn't "see" state from the first test. Why?
*/
class CheckpointISpec extends FunSuite with EmbeddedKafka {
import CheckpointISpec._
/**
* Create state with strings.
*/
test("Create RDD with string state") {
withSsc { inputStream =>
inputStream.mapWithState(stringStateSpec)
}
}
/**
* Try to access the state created before as integers.
*/
test("Create RDD with int state") {
withSsc { inputStream =>
inputStream.mapWithState(intStateSpec)
}
}
private def withSsc(action: (DStream[(String, String)]) => DStream[(String, String)]): Unit = {
withRunningKafka {
// Create Spark configuration
val conf = new SparkConf().setAppName(appName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("./tmp")
publishStringMessageToKafka(kafkaTopic, "")
// Connect to embedded Kafka
val kafkaStream = createKafkaStream(ssc).map(m => m._2 -> m._2)
publishStringMessageToKafka(kafkaTopic, "a")
publishStringMessageToKafka(kafkaTopic, "b")
publishStringMessageToKafka(kafkaTopic, "a")
publishStringMessageToKafka(kafkaTopic, "b")
// Invoke action and print it
action(kafkaStream).foreachRDD { rdd =>
rdd.foreach(println)
}
ssc.start()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
private def createKafkaStream(ssc: StreamingContext): DStream[(String, String)] = {
// Configure Kafka
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:6001")
// Create direct Kafka stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic))
}
}
object CheckpointISpec {
val appName = "CheckpointMigrationISpec"
val kafkaTopic = "test"
lazy val stringStateSpec = StateSpec.function[String, String, String, (String, String)](stateMapping("abc") _)
lazy val intStateSpec = StateSpec.function[String, String, Int, (String, String)](stateMapping(42) _)
private def stateMapping[StateType](fixedState: StateType)(time: Time,
key: String,
value: Option[String],
state: State[StateType]): Option[(String, String)] = {
println(s"key: $key, value: $value, $state")
state.update(fixedState)
value.map(key -> _)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment