Skip to content

Instantly share code, notes, and snippets.

@rmja
Last active September 13, 2022 12:09
Show Gist options
  • Save rmja/98dc7e0576c933faa0a75629b46af71c to your computer and use it in GitHub Desktop.
Save rmja/98dc7e0576c933faa0a75629b46af71c to your computer and use it in GitHub Desktop.
lz4 frame
internal readonly record struct Lz4BlockInfo(byte[] BlockBuffer, int BlockLength, bool Compressed, uint? BlockChecksum = null)
{
public Span<byte> Span => BlockBuffer.AsSpan(0, BlockLength);
public bool IsCompleted => BlockLength != 0;
}
public class Lz4FrameDecoder : IDisposable
{
private ILZ4Decoder? _decoder;
private byte[]? _buffer;
private readonly XXH32 _blockChecksum = new();
private readonly XXH32 _contentChecksum = new();
public Lz4FrameDescriptor GetFrameDescriptor(ReadOnlyMemory<byte> source) => GetFrameDescriptor(new ReadOnlySequence<byte>(source));
public Lz4FrameDescriptor GetFrameDescriptor(ReadOnlySequence<byte> source)
{
if (!TryReadHeader(source, out var header, out _))
{
throw new InvalidDataException();
}
return header.FrameDescriptor;
}
public void Decode(ReadOnlyMemory<byte> compressed, IBufferWriter<byte> decompressed) => Decode(new ReadOnlySequence<byte>(compressed), decompressed);
public void Decode(ReadOnlySequence<byte> compressed, IBufferWriter<byte> decompressed)
{
if (!TryReadHeader(compressed, out var header, out var headerLength))
{
throw new InvalidDataException();
}
compressed = compressed.Slice(headerLength);
Reset();
_decoder = LZ4Decoder.Create(
chaining: !header.FrameDescriptor.BlockIndependenceFlag,
blockSize: header.FrameDescriptor.BlockMaximumSize);
_buffer = ArrayPool<byte>.Shared.Rent(header.FrameDescriptor.BlockMaximumSize);
while (!compressed.IsEmpty)
{
if (!TryTopupAndDecode(decompressed, header, compressed, out var block, out var consumed))
{
throw new InvalidDataException();
}
if (block.BlockLength == 0)
{
return;
}
compressed = compressed.Slice(consumed);
}
}
public async ValueTask<bool> DecodeAsync(PipeReader compressed, IBufferWriter<byte> decompressed, CancellationToken cancellationToken = default)
{
Lz4FrameHeader? header;
while (true)
{
var result = await compressed.ReadAsync(cancellationToken);
if (result.IsCompleted && result.Buffer.Length == 0)
{
return false;
}
if (!TryReadHeader(result.Buffer, out header, out var headerLength))
{
// The entire header is not present - nothing is consumed, everying is examined.
compressed.AdvanceTo(result.Buffer.Start, result.Buffer.End);
continue;
}
// Consume the header
compressed.AdvanceTo(result.Buffer.Slice(0, headerLength).End);
break;
}
Reset();
_decoder = LZ4Decoder.Create(
chaining: !header.FrameDescriptor.BlockIndependenceFlag,
blockSize: header.FrameDescriptor.BlockMaximumSize);
_buffer = ArrayPool<byte>.Shared.Rent(header.FrameDescriptor.BlockMaximumSize);
while (true)
{
var result = await compressed.ReadAsync(cancellationToken);
if (!TryTopupAndDecode(decompressed, header, result.Buffer, out var block, out var consumed))
{
if (result.IsCompleted)
{
throw new InvalidDataException("Received end of compressed before terminating block was received");
}
// The entire block is not present - nothing is consumed, everything is examined.
compressed.AdvanceTo(result.Buffer.Start, result.Buffer.End);
continue;
}
// Consume the block
compressed.AdvanceTo(result.Buffer.Slice(0, consumed).End);
if (block.BlockLength == 0)
{
return true;
}
}
}
private bool TryReadHeader(ReadOnlySequence<byte> source, [MaybeNullWhen(false)] out Lz4FrameHeader header, out int consumed)
{
if (Lz4FrameHeader.TryRead(source.FirstSpan, out header, out var headerLength))
{
consumed = headerLength;
return true;
}
Span<byte> buffer = stackalloc byte[Lz4FrameHeader.MaxSize];
var length = 0;
foreach (var segment in source)
{
var span = segment.Span;
var chunk = Math.Min(span.Length, buffer.Length - length);
span[..chunk].CopyTo(buffer[length..]);
if (length > 0)
{
var headerSpan = buffer[..(length + chunk)];
if (Lz4FrameHeader.TryRead(headerSpan, out header, out headerLength))
{
consumed = headerLength;
return true;
}
}
length += chunk;
}
consumed = 0;
return false;
}
private bool TryTopupAndDecode(IBufferWriter<byte> writer, Lz4FrameHeader header, ReadOnlySequence<byte> source, out Lz4BlockInfo block, out int consumed)
{
if (!TryReadBlock(source, header, out block, out consumed))
{
return false;
}
if (block.BlockLength == 0)
{
if (header.FrameDescriptor.ContentChecksumFlag)
{
if (!TryReadUInt32(source.Slice(consumed), out var CC))
{
return false;
}
consumed += 4;
if (CC != _contentChecksum.Digest())
{
throw new InvalidDataException();
}
}
return true;
}
Span<byte> decoded;
if (block.Compressed)
{
var decodedBytes = _decoder.Decode(block.BlockBuffer, 0, block.BlockLength);
var span = writer.GetSpan(decodedBytes);
_decoder.Drain(span, -decodedBytes, decodedBytes);
decoded = span[..decodedBytes];
}
else
{
var span = writer.GetSpan(block.BlockLength);
block.Span.CopyTo(span);
decoded = span[..block.BlockLength];
}
writer.Advance(decoded.Length);
if (block.BlockChecksum.HasValue)
{
_blockChecksum.Reset();
_blockChecksum.Update(decoded);
if (_blockChecksum.Digest() != block.BlockChecksum)
{
throw new InvalidDataException();
}
}
if (header.FrameDescriptor.ContentChecksumFlag)
{
_contentChecksum.Update(decoded);
}
return true;
}
private bool TryReadBlock(ReadOnlySequence<byte> source, Lz4FrameHeader header, out Lz4BlockInfo value, out int consumed)
{
consumed = 0;
if (!TryReadUInt32(source, out var blockLength))
{
value = default;
return false;
}
consumed = 4;
if (blockLength == 0)
{
value = new Lz4BlockInfo(Array.Empty<byte>(), 0, false);
return true;
}
var compressed = (blockLength & 0x80000000) == 0;
blockLength &= 0x7FFFFFFF;
if (source.Length < 4 + blockLength)
{
value = default;
return false;
}
source.Slice(4, blockLength).CopyTo(_buffer);
consumed += (int)blockLength;
uint? blockChecksum = null;
if (header.FrameDescriptor.BlockChecksumFlag)
{
if (!TryReadUInt32(source.Slice(4 + blockLength), out var includedBlockChecksum))
{
value = default;
return false;
}
blockChecksum = includedBlockChecksum;
consumed += 4;
}
value = new Lz4BlockInfo(_buffer!, (int)blockLength, compressed, blockChecksum);
return true;
}
private bool TryReadUInt32(ReadOnlySequence<byte> source, out uint value)
{
if (source.Length < 4)
{
value = default;
return false;
}
if (source.FirstSpan.Length >= 4)
{
value = BinaryPrimitives.ReadUInt32LittleEndian(source.FirstSpan);
return true;
}
else
{
Span<byte> buffer = stackalloc byte[4];
source.Slice(0, 4).CopyTo(buffer);
value = BinaryPrimitives.ReadUInt32LittleEndian(buffer);
return true;
}
}
private void Reset()
{
_contentChecksum.Reset();
if (_buffer is not null)
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = null;
}
if (_decoder is not null)
{
_decoder.Dispose();
_decoder = null;
}
}
public void Dispose() => Reset();
}
public class Lz4FrameDescriptor
{
public const int MaxSize = 15;
public const int Version = 0b01;
public bool BlockIndependenceFlag { get; set; }
public bool BlockChecksumFlag { get; set; }
public bool ContentChecksumFlag { get; set; }
public int BlockMaximumSize { get; set; }
public long? ContentSize { get; set; }
public int? DictionaryId { get; set; }
public static readonly Lz4FrameDescriptor Default = new()
{
BlockIndependenceFlag = false,
BlockChecksumFlag = true,
ContentChecksumFlag = true,
BlockMaximumSize = Mem.K64,
ContentSize = null,
DictionaryId = null,
};
internal static bool TryRead(ReadOnlySpan<byte> source, [MaybeNullWhen(false)] out Lz4FrameDescriptor result, out int length)
{
length = 0;
if (source.Length < 2)
{
result = null;
return false;
}
var FLG = source[length++];
var BD = source[length++];
var version = (FLG >> 6) & 0x03;
var blockIndependeneFlag = (FLG & (1 << 5)) > 0;
var blockChecksumFlag = (FLG & (1 << 4)) > 0;
var hasContentSize = (FLG & (1 << 3)) > 0;
var contentChecksumFlag = (FLG & (1 << 2)) > 0;
var hasDictionaryId = (FLG & (1 << 0)) > 0;
if (version != Version)
{
throw new InvalidDataException();
}
var contentSize = 0UL;
var dictionaryId = 0U;
if (hasContentSize)
{
if (!BinaryPrimitives.TryReadUInt64LittleEndian(source[length..], out contentSize))
{
result = null;
return false;
}
length += 8;
}
if (hasDictionaryId)
{
if (!BinaryPrimitives.TryReadUInt32LittleEndian(source[length..], out dictionaryId))
{
result = null;
return false;
}
length += 4;
}
if (source.Length < length + 1)
{
result = null;
return false;
}
var digest = (byte)(XXH32.DigestOf(source[..length]) >> 8);
var HC = source[length++];
if (HC != digest)
{
throw new InvalidDataException();
}
result = new Lz4FrameDescriptor()
{
BlockIndependenceFlag = blockIndependeneFlag,
BlockChecksumFlag = blockChecksumFlag,
ContentSize = hasContentSize ? (long)contentSize : null,
ContentChecksumFlag = contentChecksumFlag,
DictionaryId = hasDictionaryId ? (int)dictionaryId : null,
BlockMaximumSize = GetBlockMaximumSize((BD >> 4) & 0x07),
};
return true;
}
internal int Write(Span<byte> destination)
{
var length = 0;
var FLG = Version << 6;
if (BlockIndependenceFlag) FLG |= 1 << 5;
if (BlockChecksumFlag) FLG |= 1 << 4;
if (ContentSize.HasValue) FLG |= 1 << 3;
if (ContentChecksumFlag) FLG |= 1 << 2;
if (DictionaryId.HasValue) FLG |= 1 << 0;
destination[length++] = (byte)FLG;
// BD byte
var BD = GetBlockMaximumSizeTableValue(BlockMaximumSize) << 4;
destination[length++] = (byte)BD;
// Content size (optional)
if (ContentSize.HasValue)
{
BinaryPrimitives.WriteUInt64LittleEndian(destination[length..], (ulong)ContentSize.Value);
length += 8;
}
// Dictionary id (optional)
if (DictionaryId.HasValue)
{
BinaryPrimitives.WriteUInt32LittleEndian(destination[length..], (uint)DictionaryId.Value);
length += 4;
}
// Header checksum
destination[length] = (byte)(XXH32.DigestOf(destination[..length]) >> 8);
length++;
return length;
}
private static int GetBlockMaximumSizeTableValue(int blockSize) =>
blockSize <= Mem.K64 ? 4 :
blockSize <= Mem.K256 ? 5 :
blockSize <= Mem.M1 ? 6 :
blockSize <= Mem.M4 ? 7 :
throw new ArgumentException("Invalid block size");
private static int GetBlockMaximumSize(int tableValue) => tableValue switch
{
4 => Mem.K64,
5 => Mem.K256,
6 => Mem.M1,
7 => Mem.M4,
_ => throw new InvalidDataException("Invalid maximum block size")
};
}
public class Lz4FrameEncoder : IDisposable
{
private ILZ4Encoder? _encoder;
private byte[]? _buffer;
private readonly XXH32 _contentChecksum = new();
private readonly XXH32 _blockChecksum = new();
public void Encode(IBufferWriter<byte> writer, ReadOnlyMemory<byte> source, LZ4Level level = LZ4Level.L00_FAST, Lz4FrameDescriptor? frameDescriptor = null) => Encode(writer, new ReadOnlySequence<byte>(source), level, frameDescriptor);
public void Encode(IBufferWriter<byte> writer, ReadOnlySequence<byte> source, LZ4Level level = LZ4Level.L00_FAST, Lz4FrameDescriptor? frameDescriptor = null)
{
Lz4BlockInfo block;
var header = new Lz4FrameHeader(frameDescriptor ?? Lz4FrameDescriptor.Default);
Reset();
_encoder = LZ4Encoder.Create(
chaining: !header.FrameDescriptor.BlockIndependenceFlag,
level,
header.FrameDescriptor.BlockMaximumSize);
_buffer = ArrayPool<byte>.Shared.Rent(LZ4Codec.MaximumOutputSize(header.FrameDescriptor.BlockMaximumSize));
// Write header
var span = writer.GetSpan(Lz4FrameHeader.MaxSize);
var length = header.Write(span);
writer.Advance(length);
var loadedButUnwritten = 0;
foreach (var segment in source)
{
var remaining = segment.Span;
while (!remaining.IsEmpty)
{
block = TopupAndEncode(remaining, out var loaded);
if (block.IsCompleted)
{
block = block with
{
BlockChecksum = GetBlockChecksum(header, remaining[..loaded])
};
WriteBlock(writer, block);
loadedButUnwritten = 0;
}
else
{
loadedButUnwritten += loaded;
}
remaining = remaining[loaded..];
}
if (header.FrameDescriptor.ContentChecksumFlag)
{
_contentChecksum.Update(segment.Span);
}
}
block = FlushAndEncode();
if (block.IsCompleted)
{
block = block with
{
BlockChecksum = GetBlockChecksum(header, source.Slice((int)source.Length - loadedButUnwritten))
};
WriteBlock(writer, block);
}
// End mark
span = writer.GetSpan(4);
BinaryPrimitives.WriteUInt32LittleEndian(span, 0);
writer.Advance(4);
if (header.FrameDescriptor.ContentChecksumFlag)
{
// Content checksum
span = writer.GetSpan(4);
BinaryPrimitives.WriteUInt32LittleEndian(span, _contentChecksum.Digest());
writer.Advance(4);
}
}
private uint? GetBlockChecksum(Lz4FrameHeader header, ReadOnlySpan<byte> blockContent)
{
if (!header.FrameDescriptor.BlockChecksumFlag)
{
return null;
}
_blockChecksum.Reset();
_blockChecksum.Update(blockContent);
return _blockChecksum.Digest();
}
private uint? GetBlockChecksum(Lz4FrameHeader header, ReadOnlySequence<byte> blockContent)
{
if (!header.FrameDescriptor.BlockChecksumFlag)
{
return null;
}
_blockChecksum.Reset();
foreach (var segment in blockContent)
{
_blockChecksum.Update(segment.Span);
}
return _blockChecksum.Digest();
}
private static void WriteBlock(IBufferWriter<byte> writer, Lz4BlockInfo blockInfo)
{
var block = blockInfo.Span;
var blockLength = block.Length;
var span = writer.GetSpan(4 + blockLength);
BinaryPrimitives.WriteUInt32LittleEndian(span, (uint)blockLength | (blockInfo.Compressed ? 0 : 0x80000000));
block.CopyTo(span[4..]);
writer.Advance(4 + blockLength);
if (blockInfo.BlockChecksum.HasValue)
{
span = writer.GetSpan(4);
BinaryPrimitives.WriteUInt32LittleEndian(span, blockInfo.BlockChecksum.Value);
writer.Advance(4);
}
}
private Lz4BlockInfo TopupAndEncode(ReadOnlySpan<byte> source, out int loaded)
{
var action = _encoder.TopupAndEncode(
source,
target: _buffer,
forceEncode: false,
allowCopy: true,
out loaded,
out var encoded);
return new Lz4BlockInfo(_buffer!, encoded, action == EncoderAction.Encoded);
}
private Lz4BlockInfo FlushAndEncode()
{
var action = _encoder.FlushAndEncode(
target: _buffer,
allowCopy: true,
out var encoded);
return new Lz4BlockInfo(_buffer!, encoded, action == EncoderAction.Encoded);
}
private void Reset()
{
_contentChecksum.Reset();
if (_encoder is not null)
{
_encoder.Dispose();
_encoder = null;
}
if (_buffer is not null)
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = null;
}
}
public void Dispose() => Reset();
}
internal class Lz4FrameHeader
{
public const int MaxSize = 4 + Lz4FrameDescriptor.MaxSize;
private const uint Magic = 0x184D2204;
public Lz4FrameDescriptor FrameDescriptor { get; }
public Lz4FrameHeader(Lz4FrameDescriptor frameDescriptor)
{
FrameDescriptor = frameDescriptor;
}
public static bool TryRead(ReadOnlySpan<byte> source, [MaybeNullWhen(false)] out Lz4FrameHeader result, out int length)
{
if (!BinaryPrimitives.TryReadUInt32LittleEndian(source, out var magic))
{
result = null;
length = 0;
return false;
}
if (magic != Magic)
{
throw new InvalidDataException();
}
if (!Lz4FrameDescriptor.TryRead(source[4..], out var frameDescriptor, out var frameDescriptorLength))
{
result = null;
length = 0;
return false;
}
result = new Lz4FrameHeader(frameDescriptor);
length = 4 + frameDescriptorLength;
return true;
}
public int Write(Span<byte> destination)
{
// Magic
BinaryPrimitives.WriteUInt32LittleEndian(destination[0..], Magic);
// Frame Descriptor
var fdLength = FrameDescriptor.Write(destination[4..]);
return 4 + fdLength;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment