Skip to content

Instantly share code, notes, and snippets.

@yufan-liu
Created June 12, 2014 03:31
Show Gist options
  • Save yufan-liu/b4d8c1ab2508e230544a to your computer and use it in GitHub Desktop.
Save yufan-liu/b4d8c1ab2508e230544a to your computer and use it in GitHub Desktop.
[spark] Replay events dispatcher
// version spark 1.0.0
// file org.apache.spark.scheduler.ReplayListenerBus
/**
* Replay each event in the order maintained in the given logs.
* This should only be called exactly once.
*/
def replay() {
assert(!replayed, "ReplayListenerBus cannot replay events more than once")
logPaths.foreach { path =>
// Keep track of input streams at all levels to close them later
// This is necessary because an exception can occur in between stream initializations
var fileStream: Option[InputStream] = None
var bufferedStream: Option[InputStream] = None
var compressStream: Option[InputStream] = None
var currentLine = "<not started>"
try {
fileStream = Some(fileSystem.open(path))
bufferedStream = Some(new BufferedInputStream(fileStream.get))
compressStream = Some(wrapForCompression(bufferedStream.get))
// Parse each line as an event and post the event to all attached listeners
val lines = Source.fromInputStream(compressStream.get).getLines()
lines.foreach { line =>
currentLine = line
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
}
} catch {
case e: Exception =>
logError("Exception in parsing Spark event log %s".format(path), e)
logError("Malformed line: %s\n".format(currentLine))
} finally {
fileStream.foreach(_.close())
bufferedStream.foreach(_.close())
compressStream.foreach(_.close())
}
}
replayed = true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment