Created
June 12, 2014 03:31
-
-
Save yufan-liu/b4d8c1ab2508e230544a to your computer and use it in GitHub Desktop.
[spark] Replay events dispatcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// version spark 1.0.0 | |
// file org.apache.spark.scheduler.ReplayListenerBus | |
/** | |
* Replay each event in the order maintained in the given logs. | |
* This should only be called exactly once. | |
*/ | |
def replay() { | |
assert(!replayed, "ReplayListenerBus cannot replay events more than once") | |
logPaths.foreach { path => | |
// Keep track of input streams at all levels to close them later | |
// This is necessary because an exception can occur in between stream initializations | |
var fileStream: Option[InputStream] = None | |
var bufferedStream: Option[InputStream] = None | |
var compressStream: Option[InputStream] = None | |
var currentLine = "<not started>" | |
try { | |
fileStream = Some(fileSystem.open(path)) | |
bufferedStream = Some(new BufferedInputStream(fileStream.get)) | |
compressStream = Some(wrapForCompression(bufferedStream.get)) | |
// Parse each line as an event and post the event to all attached listeners | |
val lines = Source.fromInputStream(compressStream.get).getLines() | |
lines.foreach { line => | |
currentLine = line | |
postToAll(JsonProtocol.sparkEventFromJson(parse(line))) | |
} | |
} catch { | |
case e: Exception => | |
logError("Exception in parsing Spark event log %s".format(path), e) | |
logError("Malformed line: %s\n".format(currentLine)) | |
} finally { | |
fileStream.foreach(_.close()) | |
bufferedStream.foreach(_.close()) | |
compressStream.foreach(_.close()) | |
} | |
} | |
replayed = true | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment