Last active
June 13, 2018 06:33
-
-
Save timrobertson100/df77d1337ba8f5609319751ee7c6e01e to your computer and use it in GitHub Desktop.
Decoder of kudu operations (UNTESTED!!!)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Decodes the protobuf bytes into {@link Operation} instances. | |
* | |
* <p>The encoded format is defined as follows: | |
* | |
* <ol> | |
* <li>"rows" is a byte array encoding of: | |
* <ol> | |
* <li>The operation type (e.g. Upsert) encoded as a byte | |
* <li>The "isSet" bitSet encoded as one or more bytes | |
* <li>Optionally a "null" bitSet flags fields with null value - encoded as one or more | |
* bytes, or omitted if no nullable fields exist in the schema | |
* <li>A concatenation of rows being mutated in the Operation where each row is encoded as | |
* series of cell data values encoded as follows: | |
* <ol> | |
* <li>If the column type is not variable length then the data is written | |
* <li>If the column is variable length (i.e. String or Binary) then an offset | |
* pointer into the indirect data buffer is written as a long, followed by the | |
* size of the variable data as an int | |
* </ol> | |
* </ol> | |
* <li>"indirect data" contains a concatenation of variable length (i.e. String and binary) | |
* field values for each row | |
* </ol> | |
*/ | |
public static class OperationsDecoder { | |
private final Schema schema; // todo: remove, as can be retrieved from table | |
private final KuduTable table; | |
public OperationsDecoder(Schema schema, KuduTable table) { | |
this.schema = schema; | |
this.table = table; | |
} | |
/** | |
* Decodes the data from the protobuf format. | |
* | |
* @param rowOps Containing the encoded Operations | |
* @return A list of Operations | |
*/ | |
public List<Operation> decode(WireProtocol.RowOperationsPB rowOps) { | |
ByteBuffer rows = rowOps.getRows().asReadOnlyByteBuffer(); | |
ByteBuffer indirectData = rowOps.getIndirectData().asReadOnlyByteBuffer(); | |
rows.reset(); // defensive coding | |
indirectData.reset(); // defensive coding | |
List<Operation> mutations = new ArrayList<>(1); | |
while (rows.hasRemaining()) { | |
// read the header information | |
Operation operation = newInstance(rows.get()); // encoded operation type | |
mutations.add(operation); | |
BitSet isSet = toBitSet(rows, schema.getColumnCount()); | |
Optional<BitSet> nullFieldSet = | |
schema.hasNullableColumns() | |
? Optional.of(toBitSet(rows, schema.getColumnCount())) | |
: Optional.empty(); | |
// read the data values | |
for (int i = isSet.nextSetBit(0); i != -1; i = isSet.nextSetBit(i + 1)) { | |
ColumnSchema col = schema.getColumnByIndex(i); | |
byte[] value = new byte[col.getType().getSize()]; | |
rows.get(value); | |
if (col.getType().equals(Type.BOOL) || col.getType().equals(Type.INT8)) { | |
operation.getRow().addByte(i, rows.get()); | |
} else if (col.getType().equals(Type.INT16)) { | |
operation.getRow().addShort(i, rows.getShort()); | |
} else if (col.getType().equals(Type.INT32)) { | |
operation.getRow().addInt(i, rows.getInt()); | |
} else if (col.getType().equals(Type.INT64)) { | |
operation.getRow().addLong(i, rows.getLong()); | |
} else if (col.getType().equals(Type.FLOAT)) { | |
operation.getRow().addFloat(i, rows.getFloat()); | |
} else if (col.getType().equals(Type.DOUBLE)) { | |
operation.getRow().addDouble(i, rows.getDouble()); | |
} else { | |
// TODO: binary, varlength and null handling | |
throw new UnsupportedOperationException("TODO binary and string varlength"); | |
} | |
} | |
} | |
return mutations; | |
} | |
// returns a new operation if the encodedType is supported | |
private Operation newInstance(final byte encodedType) throws UnsupportedOperationException { | |
if (ChangeType.INSERT.toEncodedByte() == encodedType) { | |
return new Insert(table); | |
} else if (ChangeType.UPDATE.toEncodedByte() == encodedType) { | |
return new Update(table); | |
} else if (ChangeType.UPSERT.toEncodedByte() == encodedType) { | |
return new Upsert(table); | |
} else { | |
throw new UnsupportedOperationException("Only Insert, Update and Upsert are supported"); | |
} | |
} | |
// Returns a BitSet of size numBits by reading as many bytes necessary from the buffer | |
private BitSet toBitSet(ByteBuffer buffer, int numBits) { | |
int bytesNeeded = (int) Math.ceil(numBits / 8); | |
byte[] bytes = new byte[bytesNeeded]; | |
buffer.get(bytes); | |
return Bytes.toBitSet(bytes, 0, numBits); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment