public
anonymous / offset_checkpoint
Created

offset_checkpoint

  • Download Gist
offset_checkpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
class OffsetCheckpoint(val file: File) extends Logging {
private val lock = new Object()
new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
 
def write(offsets: Map[TopicAndPartition, Long]) {
lock synchronized {
// write to temp file and then swap with the highwatermark file
val temp = new File(file.getAbsolutePath + ".tmp")
 
val writer = new BufferedWriter(new FileWriter(file))
try {
// write the current version
writer.write(0.toString)
writer.newLine()
// write the number of entries
writer.write(offsets.size.toString)
writer.newLine()
 
// write the entries
offsets.foreach { case (topicPart, offset) =>
writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset))
writer.newLine()
}
// flush and overwrite old file
writer.flush()
if(!temp.renameTo(file))
throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath))
} finally {
writer.close()
}
}
}
 
def read(): Map[TopicAndPartition, Long] = {
lock synchronized {
val reader = new BufferedReader(new FileReader(file))
try {
val version = reader.readLine().toInt
version match {
case 0 =>
val expectedSize = reader.readLine().toInt
var offsets = Map[TopicAndPartition, Long]()
var line = reader.readLine()
while(line != null) {
val pieces = line.split("\\s+")
if(pieces.length != 3)
throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line))
val topic = pieces(0)
val partition = pieces(1).toInt
val offset = pieces(2).toLong
offsets += (TopicAndPartition(pieces(0), partition) -> offset)
line = reader.readLine()
}
if(offsets.size != expectedSize)
throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size))
offsets
case _ =>
throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
}
} finally {
reader.close()
}
}
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.