Skip to content

Instantly share code, notes, and snippets.

@timrobertson100
Last active June 13, 2018 06:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timrobertson100/df77d1337ba8f5609319751ee7c6e01e to your computer and use it in GitHub Desktop.
Save timrobertson100/df77d1337ba8f5609319751ee7c6e01e to your computer and use it in GitHub Desktop.
Decoder of kudu operations (UNTESTED!!!)
/**
* Decodes the protobuf bytes into {@link Operation} instances.
*
* <p>The encoded format is defined as follows:
*
* <ol>
* <li>"rows" is a byte array encoding of:
* <ol>
* <li>The operation type (e.g. Upsert) encoded as a byte
* <li>The "isSet" bitSet encoded as one or more bytes
* <li>Optionally a "null" bitSet flags fields with null value - encoded as one or more
* bytes, or omitted if no nullable fields exist in the schema
* <li>A concatenation of rows being mutated in the Operation where each row is encoded as
* series of cell data values encoded as follows:
* <ol>
* <li>If the column type is not variable length then the data is written
* <li>If the column is variable length (i.e. String or Binary) then an offset
* pointer into the indirect data buffer is written as a long, followed by the
* size of the variable data as an int
* </ol>
* </ol>
* <li>"indirect data" contains a concatenation of variable length (i.e. String and binary)
* field values for each row
* </ol>
*/
public static class OperationsDecoder {
private final Schema schema; // todo: remove, as can be retrieved from table
private final KuduTable table;
public OperationsDecoder(Schema schema, KuduTable table) {
this.schema = schema;
this.table = table;
}
/**
* Decodes the data from the protobuf format.
*
* @param rowOps Containing the encoded Operations
* @return A list of Operations
*/
public List<Operation> decode(WireProtocol.RowOperationsPB rowOps) {
ByteBuffer rows = rowOps.getRows().asReadOnlyByteBuffer();
ByteBuffer indirectData = rowOps.getIndirectData().asReadOnlyByteBuffer();
rows.reset(); // defensive coding
indirectData.reset(); // defensive coding
List<Operation> mutations = new ArrayList<>(1);
while (rows.hasRemaining()) {
// read the header information
Operation operation = newInstance(rows.get()); // encoded operation type
mutations.add(operation);
BitSet isSet = toBitSet(rows, schema.getColumnCount());
Optional<BitSet> nullFieldSet =
schema.hasNullableColumns()
? Optional.of(toBitSet(rows, schema.getColumnCount()))
: Optional.empty();
// read the data values
for (int i = isSet.nextSetBit(0); i != -1; i = isSet.nextSetBit(i + 1)) {
ColumnSchema col = schema.getColumnByIndex(i);
byte[] value = new byte[col.getType().getSize()];
rows.get(value);
if (col.getType().equals(Type.BOOL) || col.getType().equals(Type.INT8)) {
operation.getRow().addByte(i, rows.get());
} else if (col.getType().equals(Type.INT16)) {
operation.getRow().addShort(i, rows.getShort());
} else if (col.getType().equals(Type.INT32)) {
operation.getRow().addInt(i, rows.getInt());
} else if (col.getType().equals(Type.INT64)) {
operation.getRow().addLong(i, rows.getLong());
} else if (col.getType().equals(Type.FLOAT)) {
operation.getRow().addFloat(i, rows.getFloat());
} else if (col.getType().equals(Type.DOUBLE)) {
operation.getRow().addDouble(i, rows.getDouble());
} else {
// TODO: binary, varlength and null handling
throw new UnsupportedOperationException("TODO binary and string varlength");
}
}
}
return mutations;
}
// returns a new operation if the encodedType is supported
private Operation newInstance(final byte encodedType) throws UnsupportedOperationException {
if (ChangeType.INSERT.toEncodedByte() == encodedType) {
return new Insert(table);
} else if (ChangeType.UPDATE.toEncodedByte() == encodedType) {
return new Update(table);
} else if (ChangeType.UPSERT.toEncodedByte() == encodedType) {
return new Upsert(table);
} else {
throw new UnsupportedOperationException("Only Insert, Update and Upsert are supported");
}
}
// Returns a BitSet of size numBits by reading as many bytes necessary from the buffer
private BitSet toBitSet(ByteBuffer buffer, int numBits) {
int bytesNeeded = (int) Math.ceil(numBits / 8);
byte[] bytes = new byte[bytesNeeded];
buffer.get(bytes);
return Bytes.toBitSet(bytes, 0, numBits);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment