Created
April 29, 2019 19:57
-
-
Save skonto/87d5b2368b0bf7786d9dd992a710e4e6 to your computer and use it in GitHub Desktop.
bug spark serialization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# start spark shell with Spark 3 built from master | |
# stop spark session | |
# paste the code bellow and just run MapWithState.start() | |
# use the following bash script to create files | |
#!/usr/bin/env bash | |
for j in `seq 1 1000` | |
do | |
rm test.txt | |
for i in `seq 1 1` | |
do | |
echo "$i $i" >> test.txt | |
done | |
cp test.txt /tmp/data/input_$j.txt | |
sleep 2 | |
done | |
# Kill the spark-shell and try restart the spark shell and run the stream again... | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} | |
object MapWithState { | |
def createContext(checkpointDirectory: String, inputDirectory: String, outputDirectory: String) | |
: StreamingContext = { | |
val sparkConf = new SparkConf().setAppName("Stateful Sum from Kafka").set("spark.streaming.fileStream.minRememberDuration", "6000s") | |
// Create the context with a 1 second batch size | |
val ssc = new StreamingContext(sparkConf, Seconds(5)) | |
// stream checkpoint | |
ssc.checkpoint(checkpointDirectory) | |
// Initial state RDD for mapWithState operation | |
val initialRDD = ssc.sparkContext.emptyRDD[(String, Int)] | |
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]( | |
directory = inputDirectory, | |
filter = (path: org.apache.hadoop.fs.Path) => !path.getName.endsWith("_COPYING_"), | |
newFilesOnly = true).map(_._2.toString) | |
val words = lines.map { | |
line => val values = line.split(" ") | |
(values(0), values(1).toInt) | |
} | |
// Update the cumulative count using mapWithState | |
// This will give a DStream made of state (which is the cumulative count of the words) | |
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { | |
val sum = one.getOrElse(0) + state.getOption.getOrElse(0) | |
val output = (word, sum) | |
state.update(sum) | |
output | |
} | |
val stateDstream = words.mapWithState( | |
StateSpec.function(mappingFunc).initialState(initialRDD)) | |
stateDstream.foreachRDD { rdd => | |
if (!rdd.isEmpty()) { | |
println(s"New Value for sum: ${rdd.values.max()}") | |
} | |
} | |
ssc | |
} | |
def start(): Unit = { | |
val checkpointDirectory = "/tmp/checkpoint" | |
val inputDirectory = "/tmp/data" | |
val outputDirectory = "/tmp/output" | |
Util.setStreamingLogLevels() | |
val ssc = StreamingContext.getOrCreate(checkpointDirectory, | |
() => { | |
createContext(checkpointDirectory, inputDirectory, outputDirectory) | |
}) | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} | |
import org.apache.log4j.{Level, Logger} | |
import org.apache.spark.internal.Logging | |
object Util extends Logging { | |
def setStreamingLogLevels() { | |
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements | |
if (!log4jInitialized) { | |
// We first log something to initialize Spark's default logging, then we override the | |
// logging level. | |
logInfo("Setting log level to [WARN] for streaming example." + | |
" To override add a custom log4j.properties to the classpath.") | |
Logger.getRootLogger.setLevel(Level.WARN) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment