Skip to content

Instantly share code, notes, and snippets.

@wenerme
Last active January 21, 2016 02:12
Show Gist options
  • Save wenerme/2480368c36eb66c4c02a to your computer and use it in GitHub Desktop.
Save wenerme/2480368c36eb66c4c02a to your computer and use it in GitHub Desktop.
RocketMQ commit log reader
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.zip.CRC32;
/**
* RocketMQ commit log reader with no dependency, this class is NOT thread safe.
*
* @author wener
* @since 16/1/6
*/
public class CommitLogReader {
public static final int NO_MORE = -2;
public static final int OFFSET_NOT_FOUND = -1;
public static final String DEFAULT_PATH = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog";
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final int MAX_FILE_CHANNEL = 4;
private final Path path;
private final TreeMap<Long, CommitFile> files;
private long offset;
// Shared file channel cache
private Map<Long, FileChannel> cache = new LinkedHashMap<Long, FileChannel>(MAX_FILE_CHANNEL + 1, .75F, true) {
// This method is called just after a new entry has been added
public boolean removeEldestEntry(Map.Entry<Long, FileChannel> eldest) {
if (size() > MAX_FILE_CHANNEL) {
FileChannel ch = eldest.getValue();
if (ch != null) {
try {
ch.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return size() > MAX_FILE_CHANNEL;
}
};
public CommitLogReader() {
this(Paths.get(DEFAULT_PATH));
}
public CommitLogReader(Path path) {
this.path = Objects.requireNonNull(path);
files = new TreeMap<>();
refresh();
}
public static void main(String[] args) throws IOException, InterruptedException {
CommitLogReader reader = new CommitLogReader();
System.out.printf("Dir %s files %s", reader.getPath(), reader.files);
reader.setOffset(reader.firstOffset());
Message message = new Message();
int n = 0;
while (true) {
long r = reader.take(message);
if (r > 0) {
n++;
assert message.checkCrc();
System.out.printf("#%s %s (%s -> %s) at %s : %s\n", n, message.getId(), message.getSender(), message.getReceiver(), r, message);
} else {
switch ((int) r) {
case -1:
System.out.println("Offset not found at " + reader.getOffset());
break;
case -2:
System.out.println("No more " + reader.getOffset());
break;
}
// Manual reload commit file list
reader.refresh();
Thread.sleep(1000);
}
}
}
public long firstOffset() {
if (files.isEmpty()) {
return 0;
}
return files.firstKey();
}
/**
* Refresh commit file list
*/
public void refresh() {
clear();
File[] fileList = path.toFile().listFiles();
if (fileList == null) {
return;
}
for (File file : fileList) {
CommitFile cf = new CommitFile(file.toPath());
files.put(cf.getOffset(), cf);
}
}
public Path getPath() {
return path;
}
public long getOffset() {
return offset;
}
public CommitLogReader setOffset(long offset) {
this.offset = offset;
return this;
}
/**
* Peek a message will not advance offset
*/
public long peek(Message message) throws IOException {
return get(offset, message);
}
/**
* Get message at this offset
*/
public long get(long offset, Message message) throws IOException {
Map.Entry<Long, CommitFile> entry = files.floorEntry(offset);
if (entry == null) {
return OFFSET_NOT_FOUND;
}
FileChannel ch = entry.getValue().getChannel();
long position = offset - entry.getKey();
if (ch.size() <= position + 4) {
return NO_MORE;
}
int size = ch.map(FileChannel.MapMode.READ_ONLY, position, 4).getInt();
if (size == 0) {
return NO_MORE;
}
// BLANK considered as a correct message
message.read(ch.map(FileChannel.MapMode.READ_ONLY, position, size));
return offset + size;
}
/**
* Take a message will advance the offset
*/
public long take(Message message) throws IOException {
long r = get(offset, message);
return r < 0 ? r : (offset = r);
}
public void clear() {
for (Map.Entry<Long, CommitFile> entry : files.entrySet()) {
entry.getValue().close();
}
files.clear();
}
/**
* Represent a whole message
*/
@SuppressWarnings("unused")
public static class Message {
// Message's MAGIC CODE daa320a7
public final static int MAGIC_CODE_MESSAGE = 0xAABBCCDD ^ 1880681586 + 8;
// End of file empty MAGIC CODE cbd43194
public final static int MAGIC_CODE_BLANK = 0xBBCCDDEE ^ 1880681586 + 8;
private int magicCode;
private int bodyCrc;
private int queueId;
private int flag;
private long queueOffset;
private long physicalOffset;
private int sysFlag;
private long sendTimestamp;
private long sendHost;
private long receiveTimestamp;
private long receiveHost;
private int consumeTimes;
private long preparedTxOffset;
private byte[] body;
private byte[] topic;
private byte[] property;
// should be inline
private static int safeLength(byte[] bytes) {
return bytes == null ? 0 : bytes.length;
}
public int getSize() {
return 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
+ 4 // 4 QUEUEID
+ 4 // 5 FLAG
+ 8 // 6 QUEUEOFFSET
+ 8 // 7 PHYSICALOFFSET
+ 4 // 8 SYSFLAG
+ 8 // 9 BORNTIMESTAMP
+ 8 // 10 BORNHOST
+ 8 // 11 STORETIMESTAMP
+ 8 // 12 STOREHOSTADDRESS
+ 4 // 13 RECONSUMETIMES
+ 8 // 14 Prepared Transaction Offset
+ 4 + safeLength(body) // 14 BODY
+ 1 + safeLength(topic) // 15 TOPIC
+ 2 + safeLength(property); // 16 propertiesLength//
}
public int getMagicCode() {
return magicCode;
}
public Message setMagicCode(int magicCode) {
this.magicCode = magicCode;
return this;
}
public int getBodyCrc() {
return bodyCrc;
}
public Message setBodyCrc(int bodyCrc) {
this.bodyCrc = bodyCrc;
return this;
}
public int getQueueId() {
return queueId;
}
public Message setQueueId(int queueId) {
this.queueId = queueId;
return this;
}
public int getFlag() {
return flag;
}
public Message setFlag(int flag) {
this.flag = flag;
return this;
}
public long getQueueOffset() {
return queueOffset;
}
public Message setQueueOffset(long queueOffset) {
this.queueOffset = queueOffset;
return this;
}
public long getPhysicalOffset() {
return physicalOffset;
}
public Message setPhysicalOffset(long physicalOffset) {
this.physicalOffset = physicalOffset;
return this;
}
public int getSysFlag() {
return sysFlag;
}
public Message setSysFlag(int sysFlag) {
this.sysFlag = sysFlag;
return this;
}
public long getSendTimestamp() {
return sendTimestamp;
}
public Message setSendTimestamp(long sendTimestamp) {
this.sendTimestamp = sendTimestamp;
return this;
}
public long getSendHost() {
return sendHost;
}
public Message setSendHost(long sendHost) {
this.sendHost = sendHost;
return this;
}
public long getReceiveTimestamp() {
return receiveTimestamp;
}
public Message setReceiveTimestamp(long receiveTimestamp) {
this.receiveTimestamp = receiveTimestamp;
return this;
}
public long getReceiveHost() {
return receiveHost;
}
public Message setReceiveHost(long receiveHost) {
this.receiveHost = receiveHost;
return this;
}
public int getConsumeTimes() {
return consumeTimes;
}
public Message setConsumeTimes(int consumeTimes) {
this.consumeTimes = consumeTimes;
return this;
}
public long getPreparedTxOffset() {
return preparedTxOffset;
}
public Message setPreparedTxOffset(long preparedTxOffset) {
this.preparedTxOffset = preparedTxOffset;
return this;
}
public String getBodyString() {
return asString(body);
}
public String getTopicString() {
return asString(topic);
}
private String asString(byte[] bytes) {
if (bytes == null) {
return null;
}
if (bytes.length == 0) {
return "";
}
return new String(bytes, CHARSET);
}
public InetSocketAddress getSender() {
final long x = sendHost >>> 32;
return new InetSocketAddress(String.format("%d.%d.%d.%d", x >>> 24 & 0XFF, x >> 16 & 0XFF, x >> 8 & 0xFF, x & 0xFF), (int) (sendHost));
}
public InetSocketAddress getReceiver() {
final long x = receiveHost >>> 32;
return new InetSocketAddress(String.format("%d.%d.%d.%d", x >>> 24 & 0XFF, x >> 16 & 0XFF, x >> 8 & 0xFF, x & 0xFF), (int) (receiveHost));
}
public boolean checkCrc() {
// return UtilAll.crc32(body) == bodyCrc;
CRC32 crc32 = new CRC32();
crc32.update(body);
return (crc32.getValue() & 2147483647L) == bodyCrc;
}
public boolean read(ByteBuffer buf) {
/*
4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
+ 4 // 4 QUEUEID
+ 4 // 5 FLAG
+ 8 // 6 QUEUEOFFSET
+ 8 // 7 PHYSICALOFFSET
+ 4 // 8 SYSFLAG
+ 8 // 9 BORNTIMESTAMP
+ 8 // 10 BORNHOST
+ 8 // 11 STORETIMESTAMP
+ 8 // 12 STOREHOSTADDRESS
+ 4 // 13 RECONSUMETIMES
+ 8 // 14 Prepared Transaction Offset
+ 4 + bodyLength // 14 BODY
+ 1 + topicLength // 15 TOPIC
+ 2 + propertiesLength // 16 propertiesLength
*/
buf.getInt();//ignore size
magicCode = buf.getInt();
if (isBlank()) {
return false;
}
bodyCrc = buf.getInt();
queueId = buf.getInt();
flag = buf.getInt();
queueOffset = buf.getLong();
physicalOffset = buf.getLong();
sysFlag = buf.getInt();
sendTimestamp = buf.getLong();
sendHost = buf.getLong();
receiveTimestamp = buf.getLong();
receiveHost = buf.getLong();
consumeTimes = buf.getInt();
preparedTxOffset = buf.getLong();
body = new byte[buf.getInt()];
buf.get(body);
topic = new byte[buf.get() & 0xff];
buf.get(topic);
property = new byte[buf.getShort() & 0xffff];
buf.get(property);
return true;
}
public String getPropertyString() {
return asString(property);
}
public byte[] getBody() {
return body;
}
public Message setBody(byte[] body) {
this.body = body;
return this;
}
public byte[] getTopic() {
return topic;
}
public Message setTopic(byte[] topic) {
this.topic = topic;
return this;
}
public byte[] getProperty() {
return property;
}
public Message setProperty(byte[] property) {
this.property = property;
return this;
}
/**
* @return Is this message blank with no real content.
*/
public boolean isBlank() {
return (magicCode & MAGIC_CODE_BLANK) == MAGIC_CODE_BLANK;
}
/**
* @return Message ID used in rocket mq
*/
public String getId() {
return String.format("%016X%016X", receiveHost, physicalOffset);
}
@Override
public String toString() {
return "Message{" + "size=" + getSize() +
", id=" + getId() +
", magicCode=" + magicCode +
", bodyCrc=" + bodyCrc +
", queueId=" + queueId +
", flag=" + flag +
", queueOffset=" + queueOffset +
", physicalOffset=" + physicalOffset +
", sysFlag=" + sysFlag +
", sendTimestamp=" + sendTimestamp +
", sendHost=" + sendHost +
", receiveTimestamp=" + receiveTimestamp +
", receiveHost=" + receiveHost +
", consumeTimes=" + consumeTimes +
", preparedTxOffset=" + preparedTxOffset +
", body='" + getBodyString() + '\'' +
", topic='" + getTopicString() + '\'' +
", property='" + getPropertyString() + '\'' +
'}';
}
}
class CommitFile implements AutoCloseable {
private final Path file;
private Long offset;
CommitFile(Path file) {
this.file = file;
this.offset = Long.parseLong(file.toFile().getName());
}
public FileChannel getChannel() throws IOException {
// Prevent open too many files
FileChannel ch = cache.get(offset);
if (ch == null) {
ch = FileChannel.open(file);
cache.put(offset, ch);
}
return ch;
}
@Override
public void close() {
FileChannel ch = cache.remove(offset);
if (ch != null) {
try {
ch.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public long getOffset() {
return offset;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment