Skip to content

Instantly share code, notes, and snippets.

@ivanyu
Last active January 28, 2020 16:11
Show Gist options
  • Save ivanyu/0be78aa8fd27c1fda1e4bc8a4f8a8175 to your computer and use it in GitHub Desktop.
Save ivanyu/0be78aa8fd27c1fda1e4bc8a4f8a8175 to your computer and use it in GitHub Desktop.
ZooKeeper transaction log decoder
import codecs
import sys
for line in sys.stdin:
line = line.rstrip()
hash_i = line.find("#")
print(line[:hash_i - 1])
s1 = line[hash_i+1:]
comma_i = s1.find(",")
s1 = s1[:comma_i]
payload = codecs.decode(s1, "hex")
print(payload)
print()
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.TimeZone;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.Record;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.server.persistence.FileHeader;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.MultiTxn;
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.Txn;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
@InterfaceAudience.Public
public class LogFormatter {
private static final Logger LOG = LoggerFactory.getLogger(LogFormatter.class);
/**
* @param args
*/
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("USAGE: LogFormatter log_file");
System.exit(2);
}
FileInputStream fis = new FileInputStream(args[0]);
BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
FileHeader fhdr = new FileHeader();
fhdr.deserialize(logStream, "fileheader");
if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
System.err.println("Invalid magic number for " + args[0]);
System.exit(2);
}
// System.out.println("ZooKeeper Transactional Log File with dbid "
// + fhdr.getDbid() + " txnlog format version "
// + fhdr.getVersion());
int count = 0;
while (true) {
long crcValue;
byte[] bytes;
try {
crcValue = logStream.readLong("crcvalue");
bytes = logStream.readBuffer("txnEntry");
} catch (EOFException e) {
System.out.println("EOF reached after " + count + " txns.");
return;
}
if (bytes.length == 0) {
// Since we preallocate, we define EOF to be an
// empty transaction
System.out.println("EOF reached after " + count + " txns.");
return;
}
Checksum crc = new Adler32();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) {
throw new IOException("CRC doesn't match " + crcValue +
" vs " + crc.getValue());
}
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
final Calendar c = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
c.set(2020, Calendar.JANUARY, 27, 14, 20);
final Date limit = c.getTime();
if (new Date(hdr.getTime()).after(limit)) {
break;
}
DateTimeFormatter formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.append(ISO_LOCAL_TIME)
.toFormatter();
final String line = formatter.format(ZonedDateTime.ofInstant(new Date(hdr.getTime()).toInstant(), ZoneId.of("UTC")))
+ "\tsession 0x"
+ Long.toHexString(hdr.getClientId())
+ "(" + Long.toString(hdr.getClientId()) + ")"
+ "\tcxid 0x"
// + Long.toHexString(hdr.getCxid())
// + "\tzxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " " + TraceFormatter.op2String(hdr.getType()) + " ";
if (shouldDisplayTransaction(hdr, txn)) {
if (txn == null) {
System.out.println(line);
} else {
System.out.print(line);
System.out.println(format(hdr.getType(), txn, ""));
System.out.println();
}
}
if (logStream.readByte("EOR") != 'B') {
LOG.error("Last transaction was partial.");
throw new EOFException("Last transaction was partial.");
}
count++;
}
}
private static boolean shouldDisplayTransaction(TxnHeader hdr, Record txn) {
if (hdr.getType() == ZooDefs.OpCode.createSession
|| hdr.getType() == ZooDefs.OpCode.closeSession
|| hdr.getType() == ZooDefs.OpCode.error) {
return false;
}
if (hdr.getType() == ZooDefs.OpCode.setData
&& (
((SetDataTxn) txn).getPath().startsWith("/prune/kafka_topic_partition_sizes")
|| ((SetDataTxn) txn).getPath().startsWith("/prune/mm_sources")
|| ((SetDataTxn) txn).getPath().startsWith("/prune/disk_usage")
|| ((SetDataTxn) txn).getPath().startsWith("/prune/kafka_restart_lock")
|| ((SetDataTxn) txn).getPath().startsWith("/prune/active_kafka_cluster_node_ids")
|| ((SetDataTxn) txn).getPath().startsWith("/prune/karuka"))) {
return false;
}
if (hdr.getType() == ZooDefs.OpCode.delete
&& (
((DeleteTxn) txn).getPath().startsWith("/prune/kafka_topic_partition_sizes")
|| ((DeleteTxn) txn).getPath().startsWith("/prune/mm_sources")
|| ((DeleteTxn) txn).getPath().startsWith("/prune/disk_usage")
|| ((DeleteTxn) txn).getPath().startsWith("/prune/kafka_restart_lock")
|| ((DeleteTxn) txn).getPath().startsWith("/prune/active_kafka_cluster_node_ids")
|| ((DeleteTxn) txn).getPath().startsWith("/prune/karuka"))) {
return false;
}
if (hdr.getType() == ZooDefs.OpCode.create
&& (
((CreateTxn) txn).getPath().startsWith("/prune/kafka_topic_partition_sizes")
|| ((CreateTxn) txn).getPath().startsWith("/prune/mm_sources")
|| ((CreateTxn) txn).getPath().startsWith("/prune/disk_usage")
|| ((CreateTxn) txn).getPath().startsWith("/prune/kafka_restart_lock")
|| ((CreateTxn) txn).getPath().startsWith("/prune/active_kafka_cluster_node_ids")
|| ((CreateTxn) txn).getPath().startsWith("/prune/karuka"))) {
return false;
}
return true;
}
private static String format(int type, Record txn, String prefix) throws IOException {
if (type == ZooDefs.OpCode.create) {
CreateTxn createTxn = (CreateTxn) txn;
String line = createTxn.getPath()
+ " " + (createTxn.getEphemeral() ? "E" : "nE")
;
if (createTxn.getData() != null) {
line += " " + new String(createTxn.getData());
}
return prefix + line;
}
if (type == ZooDefs.OpCode.setData) {
SetDataTxn setDataTxn = (SetDataTxn) txn;
String line = setDataTxn.getPath() + " " + setDataTxn.getVersion();
if (setDataTxn.getData() != null) {
line += " " + new String(setDataTxn.getData());
}
return prefix + line;
}
if (type == ZooDefs.OpCode.check) {
CheckVersionTxn checkVersionTxn = (CheckVersionTxn) txn;
String data = checkVersionTxn.getPath() + " " + checkVersionTxn.getVersion();
return prefix + data;
}
if (type == ZooDefs.OpCode.error) {
ErrorTxn errorTxn = (ErrorTxn) txn;
final KeeperException.Code errorCode = KeeperException.Code.get(errorTxn.getErr());
final KeeperException keeperException = KeeperException.create(errorCode);
String data = "" + errorCode;
return prefix + data;
}
if (type == ZooDefs.OpCode.multi) {
String result = "\n";
MultiTxn multiTxn = (MultiTxn) txn;
final List<Txn> innerTxns = multiTxn.getTxns();
for (int i = 0; i < innerTxns.size(); i++) {
final Txn innerTxn = innerTxns.get(i);
Record record = null;
switch (innerTxn.getType()) {
case ZooDefs.OpCode.create:
record = new CreateTxn();
break;
case ZooDefs.OpCode.delete:
record = new DeleteTxn();
break;
case ZooDefs.OpCode.setData:
record = new SetDataTxn();
break;
case ZooDefs.OpCode.check:
record = new CheckVersionTxn();
break;
case ZooDefs.OpCode.error:
record = new ErrorTxn();
break;
default:
throw new RuntimeException("Type: " + innerTxn.getType());
}
final ByteArrayInputStream bais = new ByteArrayInputStream(innerTxn.getData());
InputArchive ia = BinaryInputArchive.getArchive(bais);
record.deserialize(ia, "txn");
result += "\t" + prefix + TraceFormatter.op2String(innerTxn.getType()) + " ";
result += format(innerTxn.getType(), record, "");
if (i < innerTxns.size() - 1) {
result += "\n";
}
}
return result;
}
return "";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment