Created
May 31, 2019 09:45
-
-
Save gmucha/82438949c345c1320f61736a36dd894d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
long blockTimestamp = calculateBlockTimestamp(dataPoint); | |
String cacheKey = toCacheKey(dataPoint.getMetricKey(), blockTimestamp); | |
byte[] data = storageWrapper.get(cacheKey); | |
MutableCompressor compressor; | |
if (data == null) { | |
ByteBufferBitOutput output = new ByteBufferBitOutput(128); | |
compressor = new MutableCompressor(blockTimestamp, output); | |
} else { | |
compressor = new MutableCompressor(blockTimestamp, ByteBuffer.wrap(data)); | |
} | |
compressor.addValue(dataPoint.getTimestamp(), dataPoint.getValue()); | |
storageWrapper.put(cacheKey, compressor.serialize()); | |
class MutableCompressor { | |
private int storedLeadingZeros = Integer.MAX_VALUE; | |
private int storedTrailingZeros = 0; | |
private long storedVal = 0; | |
private long storedTimestamp = 0; | |
private long storedDelta = 0; | |
private long blockTimestamp = 0; | |
public final static short FIRST_DELTA_BITS = 27; | |
private BitOutput out; | |
@Override | |
public String toString() { | |
return MoreObjects.toStringHelper(this) | |
.add("storedLeadingZeros", storedLeadingZeros) | |
.add("storedTrailingZeros", storedTrailingZeros) | |
.add("storedVal", storedVal) | |
.add("storedTimestamp", storedTimestamp) | |
.add("storedDelta", storedDelta) | |
.toString(); | |
} | |
public byte[] serialize() { | |
ByteBuffer buffer = ((ByteBufferBitOutput) out).getByteBuffer(); | |
ByteBuffer outputBuffer = ByteBuffer.allocate(buffer.position() + 40); | |
outputBuffer.putLong(storedVal); | |
outputBuffer.putLong(storedTimestamp); | |
outputBuffer.putLong(storedDelta); | |
outputBuffer.putInt(storedLeadingZeros); | |
outputBuffer.putInt(storedTrailingZeros); | |
outputBuffer.putInt(buffer.position()); | |
buffer.flip(); | |
outputBuffer.put(buffer.slice()); | |
return outputBuffer.array(); | |
} | |
public MutableCompressor(long timestamp, ByteBuffer existingData) { | |
blockTimestamp = timestamp; | |
storedVal = existingData.getLong(); | |
storedTimestamp = existingData.getLong(); | |
storedDelta = existingData.getLong(); | |
storedLeadingZeros = existingData.getInt(); | |
storedTrailingZeros = existingData.getInt(); | |
int newPosition = existingData.getInt(); | |
ByteBuffer newBuffer = existingData.slice(); | |
newBuffer.position(newPosition); | |
out = new ByteBufferBitOutput(newBuffer); | |
} | |
public BitOutput getOut() { | |
return out; | |
} | |
// We should have access to the series? | |
public MutableCompressor(long timestamp, BitOutput output) { | |
blockTimestamp = timestamp; | |
out = output; | |
addHeader(timestamp); | |
} | |
private void addHeader(long timestamp) { | |
// One byte: length of the first delta | |
// One byte: precision of timestamps | |
out.writeBits(timestamp, 64); | |
} | |
/** | |
* Adds a new long value to the series. Note, values must be inserted in order. | |
* | |
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision) | |
* @param value next floating point value in the series | |
*/ | |
public void addValue(long timestamp, long value) { | |
if (storedTimestamp == 0) { | |
writeFirst(timestamp, value); | |
} else { | |
compressTimestamp(timestamp); | |
compressValue(value); | |
} | |
} | |
/** | |
* Adds a new double value to the series. Note, values must be inserted in order. | |
* | |
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision) | |
* @param value next floating point value in the series | |
*/ | |
public void addValue(long timestamp, double value) { | |
if (storedTimestamp == 0) { | |
writeFirst(timestamp, Double.doubleToRawLongBits(value)); | |
} else { | |
compressTimestamp(timestamp); | |
compressValue(Double.doubleToRawLongBits(value)); | |
} | |
} | |
private void writeFirst(long timestamp, long value) { | |
storedDelta = timestamp - blockTimestamp; | |
storedTimestamp = timestamp; | |
storedVal = value; | |
out.writeBits(storedDelta, FIRST_DELTA_BITS); | |
out.writeBits(storedVal, 64); | |
} | |
/** | |
* Closes the block and writes the remaining stuff to the BitOutput. | |
*/ | |
public void close() { | |
// These are selected to test interoperability and correctness of the solution, this can be read with go-tsz | |
out.writeBits(0x0F, 4); | |
out.writeBits(0xFFFFFFFF, 32); | |
out.skipBit(); | |
out.flush(); | |
} | |
/** | |
* Difference to the original Facebook paper, we store the first delta as 27 bits to allow | |
* millisecond accuracy for a one day block. | |
* <p> | |
* Also, the timestamp delta-delta is not good for millisecond compressions.. | |
* | |
* @param timestamp epoch | |
*/ | |
private void compressTimestamp(long timestamp) { | |
// a) Calculate the delta of delta | |
long newDelta = (timestamp - storedTimestamp); | |
long deltaD = newDelta - storedDelta; | |
// If delta is zero, write single 0 bit | |
if (deltaD == 0) { | |
out.skipBit(); | |
} else if (deltaD >= -63 && deltaD <= 64) { | |
out.writeBits(0x02, 2); // store '10' | |
out.writeBits(deltaD, 7); // Using 7 bits, store the value.. | |
} else if (deltaD >= -255 && deltaD <= 256) { | |
out.writeBits(0x06, 3); // store '110' | |
out.writeBits(deltaD, 9); // Use 9 bits | |
} else if (deltaD >= -2047 && deltaD <= 2048) { | |
out.writeBits(0x0E, 4); // store '1110' | |
out.writeBits(deltaD, 12); // Use 12 bits | |
} else { | |
out.writeBits(0x0F, 4); // Store '1111' | |
out.writeBits(deltaD, 32); // Store delta using 32 bits | |
} | |
storedDelta = newDelta; | |
storedTimestamp = timestamp; | |
} | |
private void compressValue(long value) { | |
// TODO Fix already compiled into a big method | |
long xor = storedVal ^ value; | |
if (xor == 0) { | |
// Write 0 | |
out.skipBit(); | |
} else { | |
int leadingZeros = Long.numberOfLeadingZeros(xor); | |
int trailingZeros = Long.numberOfTrailingZeros(xor); | |
// Check overflow of leading? Can't be 32! | |
if (leadingZeros >= 32) { | |
leadingZeros = 31; | |
} | |
// Store bit '1' | |
out.writeBit(); | |
if (leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) { | |
writeExistingLeading(xor); | |
} else { | |
writeNewLeading(xor, leadingZeros, trailingZeros); | |
} | |
} | |
storedVal = value; | |
} | |
/** | |
* If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a) | |
* store the meaningful XORed value | |
* | |
* @param xor XOR between previous value and current | |
*/ | |
private void writeExistingLeading(long xor) { | |
out.skipBit(); | |
int significantBits = 64 - storedLeadingZeros - storedTrailingZeros; | |
out.writeBits(xor >>> storedTrailingZeros, significantBits); | |
} | |
/** | |
* store the length of the number of leading zeros in the next 5 bits | |
* store length of the meaningful XORed value in the next 6 bits, | |
* store the meaningful bits of the XORed value | |
* (type b) | |
* | |
* @param xor XOR between previous value and current | |
* @param leadingZeros New leading zeros | |
* @param trailingZeros New trailing zeros | |
*/ | |
private void writeNewLeading(long xor, int leadingZeros, int trailingZeros) { | |
out.writeBit(); | |
out.writeBits(leadingZeros, 5); // Number of leading zeros in the next 5 bits | |
int significantBits = 64 - leadingZeros - trailingZeros; | |
out.writeBits(significantBits, 6); // Length of meaningful bits in the next 6 bits | |
out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR | |
storedLeadingZeros = leadingZeros; | |
storedTrailingZeros = trailingZeros; | |
} | |
} | |
class ByteBufferBitOutput implements BitOutput { | |
public static final int DEFAULT_ALLOCATION = 4096; | |
private ByteBuffer bb; | |
private byte b; | |
private int bitsLeft = Byte.SIZE; | |
/** | |
* Creates a new ByteBufferBitOutput with a default allocated size of 4096 bytes. | |
*/ | |
public ByteBufferBitOutput() { | |
this(DEFAULT_ALLOCATION); | |
} | |
/** | |
* Give an initialSize different than DEFAULT_ALLOCATIONS. Recommended to use values which are dividable by 4096. | |
* | |
* @param initialSize New initialsize to use | |
*/ | |
public ByteBufferBitOutput(int initialSize) { | |
bb = ByteBuffer.allocateDirect(initialSize); | |
b = bb.get(bb.position()); | |
} | |
public ByteBufferBitOutput(ByteBuffer byteBuffer) { | |
bb = byteBuffer; | |
b = bb.get(bb.position()); | |
} | |
private void expandAllocation() { | |
ByteBuffer largerBB = ByteBuffer.allocateDirect(bb.capacity() * 2); | |
bb.flip(); | |
largerBB.put(bb); | |
largerBB.position(bb.capacity()); | |
bb = largerBB; | |
} | |
private void flipByte() { | |
if (bitsLeft == 0) { | |
bb.put(b); | |
if (!bb.hasRemaining()) { | |
expandAllocation(); | |
} | |
b = bb.get(bb.position()); | |
bitsLeft = Byte.SIZE; | |
} | |
} | |
@Override | |
public void writeBit() { | |
b |= (1 << (bitsLeft - 1)); | |
bitsLeft--; | |
flipByte(); | |
} | |
@Override | |
public void skipBit() { | |
bitsLeft--; | |
flipByte(); | |
} | |
/** | |
* Writes the given long to the stream using bits amount of meaningful bits. | |
* | |
* @param value Value to be written to the stream | |
* @param bits How many bits are stored to the stream | |
*/ | |
public void writeBits(long value, int bits) { | |
while (bits > 0) { | |
int shift = bits - bitsLeft; | |
if (shift >= 0) { | |
b |= (byte) ((value >> shift) & ((1 << bitsLeft) - 1)); | |
bits -= bitsLeft; | |
bitsLeft = 0; | |
} else { | |
shift = bitsLeft - bits; | |
b |= (byte) (value << shift); | |
bitsLeft -= bits; | |
bits = 0; | |
} | |
flipByte(); | |
} | |
} | |
/** | |
* Causes the currently handled byte to be written to the stream | |
*/ | |
@Override | |
public void flush() { | |
bitsLeft = 0; | |
flipByte(); // Causes write to the ByteBuffer | |
} | |
/** | |
* Returns the underlying DirectByteBuffer | |
* | |
* @return ByteBuffer of type DirectByteBuffer | |
*/ | |
public ByteBuffer getByteBuffer() { | |
return this.bb; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment