Last active
January 21, 2016 02:12
-
-
Save wenerme/2480368c36eb66c4c02a to your computer and use it in GitHub Desktop.
RocketMQ commit log reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.FileChannel; | |
import java.nio.charset.Charset; | |
import java.nio.charset.StandardCharsets; | |
import java.nio.file.Path; | |
import java.nio.file.Paths; | |
import java.util.LinkedHashMap; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.TreeMap; | |
import java.util.zip.CRC32; | |
/** | |
* RocketMQ commit log reader with no dependency, this class is NOT thread safe. | |
* | |
* @author wener | |
* @since 16/1/6 | |
*/ | |
public class CommitLogReader { | |
public static final int NO_MORE = -2; | |
public static final int OFFSET_NOT_FOUND = -1; | |
public static final String DEFAULT_PATH = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; | |
private static final Charset CHARSET = StandardCharsets.UTF_8; | |
private static final int MAX_FILE_CHANNEL = 4; | |
private final Path path; | |
private final TreeMap<Long, CommitFile> files; | |
private long offset; | |
// Shared file channel cache | |
private Map<Long, FileChannel> cache = new LinkedHashMap<Long, FileChannel>(MAX_FILE_CHANNEL + 1, .75F, true) { | |
// This method is called just after a new entry has been added | |
public boolean removeEldestEntry(Map.Entry<Long, FileChannel> eldest) { | |
if (size() > MAX_FILE_CHANNEL) { | |
FileChannel ch = eldest.getValue(); | |
if (ch != null) { | |
try { | |
ch.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
return size() > MAX_FILE_CHANNEL; | |
} | |
}; | |
public CommitLogReader() { | |
this(Paths.get(DEFAULT_PATH)); | |
} | |
public CommitLogReader(Path path) { | |
this.path = Objects.requireNonNull(path); | |
files = new TreeMap<>(); | |
refresh(); | |
} | |
public static void main(String[] args) throws IOException, InterruptedException { | |
CommitLogReader reader = new CommitLogReader(); | |
System.out.printf("Dir %s files %s", reader.getPath(), reader.files); | |
reader.setOffset(reader.firstOffset()); | |
Message message = new Message(); | |
int n = 0; | |
while (true) { | |
long r = reader.take(message); | |
if (r > 0) { | |
n++; | |
assert message.checkCrc(); | |
System.out.printf("#%s %s (%s -> %s) at %s : %s\n", n, message.getId(), message.getSender(), message.getReceiver(), r, message); | |
} else { | |
switch ((int) r) { | |
case -1: | |
System.out.println("Offset not found at " + reader.getOffset()); | |
break; | |
case -2: | |
System.out.println("No more " + reader.getOffset()); | |
break; | |
} | |
// Manual reload commit file list | |
reader.refresh(); | |
Thread.sleep(1000); | |
} | |
} | |
} | |
public long firstOffset() { | |
if (files.isEmpty()) { | |
return 0; | |
} | |
return files.firstKey(); | |
} | |
/** | |
* Refresh commit file list | |
*/ | |
public void refresh() { | |
clear(); | |
File[] fileList = path.toFile().listFiles(); | |
if (fileList == null) { | |
return; | |
} | |
for (File file : fileList) { | |
CommitFile cf = new CommitFile(file.toPath()); | |
files.put(cf.getOffset(), cf); | |
} | |
} | |
public Path getPath() { | |
return path; | |
} | |
public long getOffset() { | |
return offset; | |
} | |
public CommitLogReader setOffset(long offset) { | |
this.offset = offset; | |
return this; | |
} | |
/** | |
* Peek a message will not advance offset | |
*/ | |
public long peek(Message message) throws IOException { | |
return get(offset, message); | |
} | |
/** | |
* Get message at this offset | |
*/ | |
public long get(long offset, Message message) throws IOException { | |
Map.Entry<Long, CommitFile> entry = files.floorEntry(offset); | |
if (entry == null) { | |
return OFFSET_NOT_FOUND; | |
} | |
FileChannel ch = entry.getValue().getChannel(); | |
long position = offset - entry.getKey(); | |
if (ch.size() <= position + 4) { | |
return NO_MORE; | |
} | |
int size = ch.map(FileChannel.MapMode.READ_ONLY, position, 4).getInt(); | |
if (size == 0) { | |
return NO_MORE; | |
} | |
// BLANK considered as a correct message | |
message.read(ch.map(FileChannel.MapMode.READ_ONLY, position, size)); | |
return offset + size; | |
} | |
/** | |
* Take a message will advance the offset | |
*/ | |
public long take(Message message) throws IOException { | |
long r = get(offset, message); | |
return r < 0 ? r : (offset = r); | |
} | |
public void clear() { | |
for (Map.Entry<Long, CommitFile> entry : files.entrySet()) { | |
entry.getValue().close(); | |
} | |
files.clear(); | |
} | |
/** | |
* Represent a whole message | |
*/ | |
@SuppressWarnings("unused") | |
public static class Message { | |
// Message's MAGIC CODE daa320a7 | |
public final static int MAGIC_CODE_MESSAGE = 0xAABBCCDD ^ 1880681586 + 8; | |
// End of file empty MAGIC CODE cbd43194 | |
public final static int MAGIC_CODE_BLANK = 0xBBCCDDEE ^ 1880681586 + 8; | |
private int magicCode; | |
private int bodyCrc; | |
private int queueId; | |
private int flag; | |
private long queueOffset; | |
private long physicalOffset; | |
private int sysFlag; | |
private long sendTimestamp; | |
private long sendHost; | |
private long receiveTimestamp; | |
private long receiveHost; | |
private int consumeTimes; | |
private long preparedTxOffset; | |
private byte[] body; | |
private byte[] topic; | |
private byte[] property; | |
// should be inline | |
private static int safeLength(byte[] bytes) { | |
return bytes == null ? 0 : bytes.length; | |
} | |
public int getSize() { | |
return 4 // 1 TOTALSIZE | |
+ 4 // 2 MAGICCODE | |
+ 4 // 3 BODYCRC | |
+ 4 // 4 QUEUEID | |
+ 4 // 5 FLAG | |
+ 8 // 6 QUEUEOFFSET | |
+ 8 // 7 PHYSICALOFFSET | |
+ 4 // 8 SYSFLAG | |
+ 8 // 9 BORNTIMESTAMP | |
+ 8 // 10 BORNHOST | |
+ 8 // 11 STORETIMESTAMP | |
+ 8 // 12 STOREHOSTADDRESS | |
+ 4 // 13 RECONSUMETIMES | |
+ 8 // 14 Prepared Transaction Offset | |
+ 4 + safeLength(body) // 14 BODY | |
+ 1 + safeLength(topic) // 15 TOPIC | |
+ 2 + safeLength(property); // 16 propertiesLength// | |
} | |
public int getMagicCode() { | |
return magicCode; | |
} | |
public Message setMagicCode(int magicCode) { | |
this.magicCode = magicCode; | |
return this; | |
} | |
public int getBodyCrc() { | |
return bodyCrc; | |
} | |
public Message setBodyCrc(int bodyCrc) { | |
this.bodyCrc = bodyCrc; | |
return this; | |
} | |
public int getQueueId() { | |
return queueId; | |
} | |
public Message setQueueId(int queueId) { | |
this.queueId = queueId; | |
return this; | |
} | |
public int getFlag() { | |
return flag; | |
} | |
public Message setFlag(int flag) { | |
this.flag = flag; | |
return this; | |
} | |
public long getQueueOffset() { | |
return queueOffset; | |
} | |
public Message setQueueOffset(long queueOffset) { | |
this.queueOffset = queueOffset; | |
return this; | |
} | |
public long getPhysicalOffset() { | |
return physicalOffset; | |
} | |
public Message setPhysicalOffset(long physicalOffset) { | |
this.physicalOffset = physicalOffset; | |
return this; | |
} | |
public int getSysFlag() { | |
return sysFlag; | |
} | |
public Message setSysFlag(int sysFlag) { | |
this.sysFlag = sysFlag; | |
return this; | |
} | |
public long getSendTimestamp() { | |
return sendTimestamp; | |
} | |
public Message setSendTimestamp(long sendTimestamp) { | |
this.sendTimestamp = sendTimestamp; | |
return this; | |
} | |
public long getSendHost() { | |
return sendHost; | |
} | |
public Message setSendHost(long sendHost) { | |
this.sendHost = sendHost; | |
return this; | |
} | |
public long getReceiveTimestamp() { | |
return receiveTimestamp; | |
} | |
public Message setReceiveTimestamp(long receiveTimestamp) { | |
this.receiveTimestamp = receiveTimestamp; | |
return this; | |
} | |
public long getReceiveHost() { | |
return receiveHost; | |
} | |
public Message setReceiveHost(long receiveHost) { | |
this.receiveHost = receiveHost; | |
return this; | |
} | |
public int getConsumeTimes() { | |
return consumeTimes; | |
} | |
public Message setConsumeTimes(int consumeTimes) { | |
this.consumeTimes = consumeTimes; | |
return this; | |
} | |
public long getPreparedTxOffset() { | |
return preparedTxOffset; | |
} | |
public Message setPreparedTxOffset(long preparedTxOffset) { | |
this.preparedTxOffset = preparedTxOffset; | |
return this; | |
} | |
public String getBodyString() { | |
return asString(body); | |
} | |
public String getTopicString() { | |
return asString(topic); | |
} | |
private String asString(byte[] bytes) { | |
if (bytes == null) { | |
return null; | |
} | |
if (bytes.length == 0) { | |
return ""; | |
} | |
return new String(bytes, CHARSET); | |
} | |
public InetSocketAddress getSender() { | |
final long x = sendHost >>> 32; | |
return new InetSocketAddress(String.format("%d.%d.%d.%d", x >>> 24 & 0XFF, x >> 16 & 0XFF, x >> 8 & 0xFF, x & 0xFF), (int) (sendHost)); | |
} | |
public InetSocketAddress getReceiver() { | |
final long x = receiveHost >>> 32; | |
return new InetSocketAddress(String.format("%d.%d.%d.%d", x >>> 24 & 0XFF, x >> 16 & 0XFF, x >> 8 & 0xFF, x & 0xFF), (int) (receiveHost)); | |
} | |
public boolean checkCrc() { | |
// return UtilAll.crc32(body) == bodyCrc; | |
CRC32 crc32 = new CRC32(); | |
crc32.update(body); | |
return (crc32.getValue() & 2147483647L) == bodyCrc; | |
} | |
public boolean read(ByteBuffer buf) { | |
/* | |
4 // 1 TOTALSIZE | |
+ 4 // 2 MAGICCODE | |
+ 4 // 3 BODYCRC | |
+ 4 // 4 QUEUEID | |
+ 4 // 5 FLAG | |
+ 8 // 6 QUEUEOFFSET | |
+ 8 // 7 PHYSICALOFFSET | |
+ 4 // 8 SYSFLAG | |
+ 8 // 9 BORNTIMESTAMP | |
+ 8 // 10 BORNHOST | |
+ 8 // 11 STORETIMESTAMP | |
+ 8 // 12 STOREHOSTADDRESS | |
+ 4 // 13 RECONSUMETIMES | |
+ 8 // 14 Prepared Transaction Offset | |
+ 4 + bodyLength // 14 BODY | |
+ 1 + topicLength // 15 TOPIC | |
+ 2 + propertiesLength // 16 propertiesLength | |
*/ | |
buf.getInt();//ignore size | |
magicCode = buf.getInt(); | |
if (isBlank()) { | |
return false; | |
} | |
bodyCrc = buf.getInt(); | |
queueId = buf.getInt(); | |
flag = buf.getInt(); | |
queueOffset = buf.getLong(); | |
physicalOffset = buf.getLong(); | |
sysFlag = buf.getInt(); | |
sendTimestamp = buf.getLong(); | |
sendHost = buf.getLong(); | |
receiveTimestamp = buf.getLong(); | |
receiveHost = buf.getLong(); | |
consumeTimes = buf.getInt(); | |
preparedTxOffset = buf.getLong(); | |
body = new byte[buf.getInt()]; | |
buf.get(body); | |
topic = new byte[buf.get() & 0xff]; | |
buf.get(topic); | |
property = new byte[buf.getShort() & 0xffff]; | |
buf.get(property); | |
return true; | |
} | |
public String getPropertyString() { | |
return asString(property); | |
} | |
public byte[] getBody() { | |
return body; | |
} | |
public Message setBody(byte[] body) { | |
this.body = body; | |
return this; | |
} | |
public byte[] getTopic() { | |
return topic; | |
} | |
public Message setTopic(byte[] topic) { | |
this.topic = topic; | |
return this; | |
} | |
public byte[] getProperty() { | |
return property; | |
} | |
public Message setProperty(byte[] property) { | |
this.property = property; | |
return this; | |
} | |
/** | |
* @return Is this message blank with no real content. | |
*/ | |
public boolean isBlank() { | |
return (magicCode & MAGIC_CODE_BLANK) == MAGIC_CODE_BLANK; | |
} | |
/** | |
* @return Message ID used in rocket mq | |
*/ | |
public String getId() { | |
return String.format("%016X%016X", receiveHost, physicalOffset); | |
} | |
@Override | |
public String toString() { | |
return "Message{" + "size=" + getSize() + | |
", id=" + getId() + | |
", magicCode=" + magicCode + | |
", bodyCrc=" + bodyCrc + | |
", queueId=" + queueId + | |
", flag=" + flag + | |
", queueOffset=" + queueOffset + | |
", physicalOffset=" + physicalOffset + | |
", sysFlag=" + sysFlag + | |
", sendTimestamp=" + sendTimestamp + | |
", sendHost=" + sendHost + | |
", receiveTimestamp=" + receiveTimestamp + | |
", receiveHost=" + receiveHost + | |
", consumeTimes=" + consumeTimes + | |
", preparedTxOffset=" + preparedTxOffset + | |
", body='" + getBodyString() + '\'' + | |
", topic='" + getTopicString() + '\'' + | |
", property='" + getPropertyString() + '\'' + | |
'}'; | |
} | |
} | |
class CommitFile implements AutoCloseable { | |
private final Path file; | |
private Long offset; | |
CommitFile(Path file) { | |
this.file = file; | |
this.offset = Long.parseLong(file.toFile().getName()); | |
} | |
public FileChannel getChannel() throws IOException { | |
// Prevent open too many files | |
FileChannel ch = cache.get(offset); | |
if (ch == null) { | |
ch = FileChannel.open(file); | |
cache.put(offset, ch); | |
} | |
return ch; | |
} | |
@Override | |
public void close() { | |
FileChannel ch = cache.remove(offset); | |
if (ch != null) { | |
try { | |
ch.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
public long getOffset() { | |
return offset; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment