Skip to content

Instantly share code, notes, and snippets.

@gmucha
Created May 31, 2019 09:45
Show Gist options
  • Save gmucha/82438949c345c1320f61736a36dd894d to your computer and use it in GitHub Desktop.
Save gmucha/82438949c345c1320f61736a36dd894d to your computer and use it in GitHub Desktop.
long blockTimestamp = calculateBlockTimestamp(dataPoint);
String cacheKey = toCacheKey(dataPoint.getMetricKey(), blockTimestamp);
byte[] data = storageWrapper.get(cacheKey);
MutableCompressor compressor;
if (data == null) {
ByteBufferBitOutput output = new ByteBufferBitOutput(128);
compressor = new MutableCompressor(blockTimestamp, output);
} else {
compressor = new MutableCompressor(blockTimestamp, ByteBuffer.wrap(data));
}
compressor.addValue(dataPoint.getTimestamp(), dataPoint.getValue());
storageWrapper.put(cacheKey, compressor.serialize());
class MutableCompressor {
private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;
private long storedVal = 0;
private long storedTimestamp = 0;
private long storedDelta = 0;
private long blockTimestamp = 0;
public final static short FIRST_DELTA_BITS = 27;
private BitOutput out;
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("storedLeadingZeros", storedLeadingZeros)
.add("storedTrailingZeros", storedTrailingZeros)
.add("storedVal", storedVal)
.add("storedTimestamp", storedTimestamp)
.add("storedDelta", storedDelta)
.toString();
}
public byte[] serialize() {
ByteBuffer buffer = ((ByteBufferBitOutput) out).getByteBuffer();
ByteBuffer outputBuffer = ByteBuffer.allocate(buffer.position() + 40);
outputBuffer.putLong(storedVal);
outputBuffer.putLong(storedTimestamp);
outputBuffer.putLong(storedDelta);
outputBuffer.putInt(storedLeadingZeros);
outputBuffer.putInt(storedTrailingZeros);
outputBuffer.putInt(buffer.position());
buffer.flip();
outputBuffer.put(buffer.slice());
return outputBuffer.array();
}
public MutableCompressor(long timestamp, ByteBuffer existingData) {
blockTimestamp = timestamp;
storedVal = existingData.getLong();
storedTimestamp = existingData.getLong();
storedDelta = existingData.getLong();
storedLeadingZeros = existingData.getInt();
storedTrailingZeros = existingData.getInt();
int newPosition = existingData.getInt();
ByteBuffer newBuffer = existingData.slice();
newBuffer.position(newPosition);
out = new ByteBufferBitOutput(newBuffer);
}
public BitOutput getOut() {
return out;
}
// We should have access to the series?
public MutableCompressor(long timestamp, BitOutput output) {
blockTimestamp = timestamp;
out = output;
addHeader(timestamp);
}
private void addHeader(long timestamp) {
// One byte: length of the first delta
// One byte: precision of timestamps
out.writeBits(timestamp, 64);
}
/**
* Adds a new long value to the series. Note, values must be inserted in order.
*
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision)
* @param value next floating point value in the series
*/
public void addValue(long timestamp, long value) {
if (storedTimestamp == 0) {
writeFirst(timestamp, value);
} else {
compressTimestamp(timestamp);
compressValue(value);
}
}
/**
* Adds a new double value to the series. Note, values must be inserted in order.
*
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision)
* @param value next floating point value in the series
*/
public void addValue(long timestamp, double value) {
if (storedTimestamp == 0) {
writeFirst(timestamp, Double.doubleToRawLongBits(value));
} else {
compressTimestamp(timestamp);
compressValue(Double.doubleToRawLongBits(value));
}
}
private void writeFirst(long timestamp, long value) {
storedDelta = timestamp - blockTimestamp;
storedTimestamp = timestamp;
storedVal = value;
out.writeBits(storedDelta, FIRST_DELTA_BITS);
out.writeBits(storedVal, 64);
}
/**
* Closes the block and writes the remaining stuff to the BitOutput.
*/
public void close() {
// These are selected to test interoperability and correctness of the solution, this can be read with go-tsz
out.writeBits(0x0F, 4);
out.writeBits(0xFFFFFFFF, 32);
out.skipBit();
out.flush();
}
/**
* Difference to the original Facebook paper, we store the first delta as 27 bits to allow
* millisecond accuracy for a one day block.
* <p>
* Also, the timestamp delta-delta is not good for millisecond compressions..
*
* @param timestamp epoch
*/
private void compressTimestamp(long timestamp) {
// a) Calculate the delta of delta
long newDelta = (timestamp - storedTimestamp);
long deltaD = newDelta - storedDelta;
// If delta is zero, write single 0 bit
if (deltaD == 0) {
out.skipBit();
} else if (deltaD >= -63 && deltaD <= 64) {
out.writeBits(0x02, 2); // store '10'
out.writeBits(deltaD, 7); // Using 7 bits, store the value..
} else if (deltaD >= -255 && deltaD <= 256) {
out.writeBits(0x06, 3); // store '110'
out.writeBits(deltaD, 9); // Use 9 bits
} else if (deltaD >= -2047 && deltaD <= 2048) {
out.writeBits(0x0E, 4); // store '1110'
out.writeBits(deltaD, 12); // Use 12 bits
} else {
out.writeBits(0x0F, 4); // Store '1111'
out.writeBits(deltaD, 32); // Store delta using 32 bits
}
storedDelta = newDelta;
storedTimestamp = timestamp;
}
private void compressValue(long value) {
// TODO Fix already compiled into a big method
long xor = storedVal ^ value;
if (xor == 0) {
// Write 0
out.skipBit();
} else {
int leadingZeros = Long.numberOfLeadingZeros(xor);
int trailingZeros = Long.numberOfTrailingZeros(xor);
// Check overflow of leading? Can't be 32!
if (leadingZeros >= 32) {
leadingZeros = 31;
}
// Store bit '1'
out.writeBit();
if (leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) {
writeExistingLeading(xor);
} else {
writeNewLeading(xor, leadingZeros, trailingZeros);
}
}
storedVal = value;
}
/**
* If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a)
* store the meaningful XORed value
*
* @param xor XOR between previous value and current
*/
private void writeExistingLeading(long xor) {
out.skipBit();
int significantBits = 64 - storedLeadingZeros - storedTrailingZeros;
out.writeBits(xor >>> storedTrailingZeros, significantBits);
}
/**
* store the length of the number of leading zeros in the next 5 bits
* store length of the meaningful XORed value in the next 6 bits,
* store the meaningful bits of the XORed value
* (type b)
*
* @param xor XOR between previous value and current
* @param leadingZeros New leading zeros
* @param trailingZeros New trailing zeros
*/
private void writeNewLeading(long xor, int leadingZeros, int trailingZeros) {
out.writeBit();
out.writeBits(leadingZeros, 5); // Number of leading zeros in the next 5 bits
int significantBits = 64 - leadingZeros - trailingZeros;
out.writeBits(significantBits, 6); // Length of meaningful bits in the next 6 bits
out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR
storedLeadingZeros = leadingZeros;
storedTrailingZeros = trailingZeros;
}
}
class ByteBufferBitOutput implements BitOutput {
public static final int DEFAULT_ALLOCATION = 4096;
private ByteBuffer bb;
private byte b;
private int bitsLeft = Byte.SIZE;
/**
* Creates a new ByteBufferBitOutput with a default allocated size of 4096 bytes.
*/
public ByteBufferBitOutput() {
this(DEFAULT_ALLOCATION);
}
/**
* Give an initialSize different than DEFAULT_ALLOCATIONS. Recommended to use values which are dividable by 4096.
*
* @param initialSize New initialsize to use
*/
public ByteBufferBitOutput(int initialSize) {
bb = ByteBuffer.allocateDirect(initialSize);
b = bb.get(bb.position());
}
public ByteBufferBitOutput(ByteBuffer byteBuffer) {
bb = byteBuffer;
b = bb.get(bb.position());
}
private void expandAllocation() {
ByteBuffer largerBB = ByteBuffer.allocateDirect(bb.capacity() * 2);
bb.flip();
largerBB.put(bb);
largerBB.position(bb.capacity());
bb = largerBB;
}
private void flipByte() {
if (bitsLeft == 0) {
bb.put(b);
if (!bb.hasRemaining()) {
expandAllocation();
}
b = bb.get(bb.position());
bitsLeft = Byte.SIZE;
}
}
@Override
public void writeBit() {
b |= (1 << (bitsLeft - 1));
bitsLeft--;
flipByte();
}
@Override
public void skipBit() {
bitsLeft--;
flipByte();
}
/**
* Writes the given long to the stream using bits amount of meaningful bits.
*
* @param value Value to be written to the stream
* @param bits How many bits are stored to the stream
*/
public void writeBits(long value, int bits) {
while (bits > 0) {
int shift = bits - bitsLeft;
if (shift >= 0) {
b |= (byte) ((value >> shift) & ((1 << bitsLeft) - 1));
bits -= bitsLeft;
bitsLeft = 0;
} else {
shift = bitsLeft - bits;
b |= (byte) (value << shift);
bitsLeft -= bits;
bits = 0;
}
flipByte();
}
}
/**
* Causes the currently handled byte to be written to the stream
*/
@Override
public void flush() {
bitsLeft = 0;
flipByte(); // Causes write to the ByteBuffer
}
/**
* Returns the underlying DirectByteBuffer
*
* @return ByteBuffer of type DirectByteBuffer
*/
public ByteBuffer getByteBuffer() {
return this.bb;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment