Skip to content

Instantly share code, notes, and snippets.

@skonto
Created April 29, 2019 19:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skonto/87d5b2368b0bf7786d9dd992a710e4e6 to your computer and use it in GitHub Desktop.
Save skonto/87d5b2368b0bf7786d9dd992a710e4e6 to your computer and use it in GitHub Desktop.
bug spark serialization
# start spark shell with Spark 3 built from master
# stop spark session
# paste the code bellow and just run MapWithState.start()
# use the following bash script to create files
#!/usr/bin/env bash
for j in `seq 1 1000`
do
rm test.txt
for i in `seq 1 1`
do
echo "$i $i" >> test.txt
done
cp test.txt /tmp/data/input_$j.txt
sleep 2
done
# Kill the spark-shell and try restart the spark shell and run the stream again...
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
object MapWithState {
def createContext(checkpointDirectory: String, inputDirectory: String, outputDirectory: String)
: StreamingContext = {
val sparkConf = new SparkConf().setAppName("Stateful Sum from Kafka").set("spark.streaming.fileStream.minRememberDuration", "6000s")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(5))
// stream checkpoint
ssc.checkpoint(checkpointDirectory)
// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.emptyRDD[(String, Int)]
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](
directory = inputDirectory,
filter = (path: org.apache.hadoop.fs.Path) => !path.getName.endsWith("_COPYING_"),
newFilesOnly = true).map(_._2.toString)
val words = lines.map {
line => val values = line.split(" ")
(values(0), values(1).toInt)
}
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateDstream = words.mapWithState(
StateSpec.function(mappingFunc).initialState(initialRDD))
stateDstream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
println(s"New Value for sum: ${rdd.values.max()}")
}
}
ssc
}
def start(): Unit = {
val checkpointDirectory = "/tmp/checkpoint"
val inputDirectory = "/tmp/data"
val outputDirectory = "/tmp/output"
Util.setStreamingLogLevels()
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(checkpointDirectory, inputDirectory, outputDirectory)
})
ssc.start()
ssc.awaitTermination()
}
}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object Util extends Logging {
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment