Skip to content

Instantly share code, notes, and snippets.

@jjkoshy
Created June 19, 2014 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jjkoshy/4657a44e52e3f88be1c1 to your computer and use it in GitHub Desktop.
Save jjkoshy/4657a44e52e3f88be1c1 to your computer and use it in GitHub Desktop.
diff --git a/config/log4j.properties b/config/log4j.properties
index baa698b..9502254 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
-log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
diff --git a/config/server.properties b/config/server.properties
index 9dcb253..e295e84 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -102,7 +102,7 @@ log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
-log.cleaner.enable=false
+log.cleaner.enable=true
#log.cleanup.policy=delete
#offsets.load.buffer.size=104857600
#offsets.load.buffer.size=52428800
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index b2652dd..948696f 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -255,8 +255,8 @@ class FileMessageSet private[kafka](@volatile var file: File,
/**
* Read from the underlying file into the buffer starting at the given position
*/
- def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
- channel.read(buffer, relativePosition + this.start)
+ def readInto(buffer: ByteBuffer): ByteBuffer = {
+ channel.read(buffer, this.start)
buffer.flip()
buffer
}
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 312204c..340a05d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -213,7 +213,8 @@ class LogCleaner(val config: CleanerConfig,
* This class holds the actual logic for cleaning a log
* @param id An identifier used for logging
* @param offsetMap The map used for deduplication
- * @param bufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param ioBufferSize The initial size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param maxIoBufferSize The maximum size of the buffers to use. (The read and write buffers are allowed to grow from the initial size to this maximum size.)
* @param throttler The throttler instance to use for limiting I/O rate.
* @param time The time instance
*/
@@ -280,7 +281,6 @@ private[log] class Cleaner(val id: Int,
* @param log The log being cleaned
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
- * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
* @param deleteHorizonMs The time to retain delete tombstones
*/
private[log] def cleanSegments(log: Log,
@@ -338,20 +338,21 @@ private[log] class Cleaner(val id: Int,
*/
private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
- var position = 0
- while (position < source.log.sizeInBytes) {
+ var currOffset = source.baseOffset
+ while (currOffset < source.index.lastOffset) {
checkDone(topicAndPartition)
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out
readBuffer.clear()
writeBuffer.clear()
- val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
- throttler.maybeThrottle(messages.sizeInBytes)
- // check each message to see if it is to be retained
+
+ val messages = source.read(currOffset, None, readBuffer.limit()).asInstanceOf[FileMessageSet]
+ messages.readInto(readBuffer)
+ val messageSet = new ByteBufferMessageSet(readBuffer)
+ throttler.maybeThrottle(messageSet.sizeInBytes)
var messagesRead = 0
- for (entry <- messages) {
+ for (entry <- messageSet) { // deep iteration
messagesRead += 1
val size = MessageSet.entrySize(entry.message)
- position += size
stats.readMessage(size)
val key = entry.message.key
require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
@@ -366,6 +367,7 @@ private[log] class Cleaner(val id: Int,
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
stats.recopyMessage(size)
}
+ currOffset = entry.nextOffset
}
// if any messages are to be retained, write them out
if(writeBuffer.position > 0) {
@@ -374,10 +376,11 @@ private[log] class Cleaner(val id: Int,
dest.append(retained.head.offset, retained)
throttler.maybeThrottle(writeBuffer.limit)
}
-
+
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
if(readBuffer.limit > 0 && messagesRead == 0)
growBuffers()
+
}
restoreBuffers()
}
@@ -473,29 +476,31 @@ private[log] class Cleaner(val id: Int,
* @return The final offset covered by the map
*/
private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
- var position = 0
- var offset = segment.baseOffset
- while (position < segment.log.sizeInBytes) {
+ var currOffset = segment.baseOffset
+ var lastSeenOffset = currOffset
+ while (currOffset < segment.index.lastOffset) {
checkDone(topicAndPartition)
readBuffer.clear()
- val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
- throttler.maybeThrottle(messages.sizeInBytes)
- val startPosition = position
- for (entry <- messages) {
+ val messages = segment.read(currOffset, None, readBuffer.limit()).asInstanceOf[FileMessageSet]
+ messages.readInto(readBuffer)
+ val messageSet = new ByteBufferMessageSet(readBuffer)
+ var messagesRead = 0
+ for (entry <- messageSet) {
+ messagesRead += 1
val message = entry.message
require(message.hasKey)
val size = MessageSet.entrySize(message)
- position += size
map.put(message.key, entry.offset)
- offset = entry.offset
+ currOffset = entry.nextOffset
+ lastSeenOffset = entry.offset
stats.indexMessage(size)
}
- // if we didn't read even one complete message, our read buffer may be too small
- if(position == startPosition)
+ if (readBuffer.limit() > 0 && messagesRead == 0)
growBuffers()
}
+
restoreBuffers()
- offset
+ lastSeenOffset
}
}
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 19d423f..3bfa11b 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -304,7 +304,7 @@ class OffsetManager(val config: OffsetManagerConfig,
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
buffer.clear()
val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet]
- messages.readInto(buffer, 0)
+ messages.readInto(buffer)
val bbset = new ByteBufferMessageSet(buffer)
bbset.foreach { msgAndOffset =>
require(msgAndOffset.message.key != null, "Offset entry key should not be null")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment