Skip to content

Instantly share code, notes, and snippets.

@ladeak
Last active February 15, 2023 16:46
Show Gist options
  • Save ladeak/71b0b4c59bdd4eb548535dc641729682 to your computer and use it in GitHub Desktop.
Save ladeak/71b0b4c59bdd4eb548535dc641729682 to your computer and use it in GitHub Desktop.
ArrayBufferWriters

Pooling IBufferWriter

In the .NET 7 area some high performance APIs offer overloads for dealing with raw data via IBufferWriter<T>. IBufferWriter is a contract for buffered writing. High performance APIs typically offer an overload of their methods with IBufferWriter<T> along with byte[], Stream or Memory<T>. However, this is not the case for every API.

IBufferWriter<T> offers three methods:

  • GetMemory() and GetSpan() get a piece of writable memory.
  • Advance() to notify the buffer writer that data has been written.

.NET 7 offers two implementations of IBufferWriter<T>: Pipe (via PipeWriter) and ArrayBufferWriter<T>. Pipes help to solve the problem of parsing high performance streaming data. ArrayBufferWriter<T> offers an array-based buffering solution by implementing IBufferWriter<T>.

Problem

In this post I am planning to use an IBufferWriter<T> API from protobuf-net. I explore what approaches one may take to provide an IBufferWriter<T> implementation to the Serialize method.

Solutions

Pipelines

While a Pipe is extremely powerful (and my default go-to type), I ruled out using it for the following reasons:

  • In this case we do not require parallel reading and writing of sdata.
  • In this case we do not require parsing data (for example, by delimiters).

ArrayBufferWriter

The second built-in type is using an array internally to buffer the written data. Every time a user of this type writes data, it checks (in .NET 7) if there is enough space left in the already allocated array. If there is enough space, the remainder of the array may be used for writing the additional data. However, when there is not enough space, a new, larger array is allocated, and existing data is copied over. The downside of this type is additional cost of re-allocating the array and copying the already written data. If someone uses the same size of data over-and-over again, then an instance of ArrayBufferWriter<T> maybe cached and cleared between iterations. In this case, the user will not have to pay the costs associated to extending the array if it is too small. More precisely, it is paid once, on the very first iteration.

PooledArrayBufferWriter

In case of variable data sizes, the above approach might not work:

  • For example, a single iteration requires a significantly larger array: all following iterations will hold onto this large array as it is still cached. The application has no way to return the unused memory segment to the OS.
  • In case someone uses ArrayBufferWriter<T> without caching a single instance, the developer might end up in scenarios where new arrays must be allocated and expanded multiple times per iteration, which diminishes performance benefits.

A reasonable alternative approach is to implement a custom ArrayBufferWriter<T>, that internally uses an ArrayPool. For example, a non-production ready example of this could be:

public sealed class PooledArrayBufferWriter<T> : IBufferWriter<T>
{
    private T[] _buffer;
    private int _index;

    // Some members excluded for brevity.

    public void Advance(int count)
    {
        if (count < 0 || _index > _buffer.Length - count)
            throw new ArgumentException(null, nameof(count));
        _index += count;
    }

    public Memory<T> GetMemory(int sizeHint = 0)
    {
        CheckAndResizeBuffer(sizeHint);
        return _buffer.AsMemory(_index);
    }

    public Span<T> GetSpan(int sizeHint = 0)
    {
        CheckAndResizeBuffer(sizeHint);
        return _buffer.AsSpan(_index);
    }

    private void CheckAndResizeBuffer(int sizeHint)
    {
        if (sizeHint < 0)
            throw new ArgumentException(nameof(sizeHint));

        if (sizeHint == 0)
            sizeHint = 1;

        if (sizeHint > FreeCapacity)
        {
            int currentLength = _buffer.Length;
            int growBy = Math.Max(sizeHint, currentLength);
            int newSize = currentLength + growBy;
            var temp = ArrayPool<T>.Shared.Rent(newSize);
            Array.Copy(_buffer, temp, _index);
            ArrayPool<T>.Shared.Return(_buffer);
            _buffer = temp;
        }
    }
}

Note, that a more complete example is available on GitHub.

The above example is based on the implementation principals of ArrayBufferWriter<T>. Every time a new memory segment is requested, an underlying array is checked to have enough space for the new segment. If there is enough space, the remaining part of the array is returned. If there is not enough space a new array is requested from the shared array pool, and the data is copied into the new, larger array. The old array then is returned to the pool. Note, that the full implementation of this type also implements a Clear method, which returns the last rented array to the pool, to avoid referencing any excessive large arrays after usage.

The downsides of PooledArrayBufferWriter<T> is that it will incur an allocation: currently implemented as a class, hence it needs instantiation, or as a struct it will be boxed due to IBufferWriter<T>. As a class, one can use an ObjectPool, to pool the instance of PooledArrayBufferWriter<T>. Using an object pool is also beneficial from the API usage point of view: the developer does not need to clear the buffer explicitly, but it can rely on the mechanism of returning the object to the pool. For more details, please see a reference implementation on GitHub.

ChainedArrayBufferWriter

One downside to PooledArrayBufferWriter<T> is that it allocates a continuous memory segment. For larger data sizes it means that the ArrayPool<T> returns an object from the LOH (Large Object Heap). It also still incurs a performance penalty for copying the existing data into the newly rented array, every time an expansion is required.

One way to deal with these problems is to allocate multiple, smaller buffers that are chained together like linked list elements. In .NET 7 this idea manifests in ReadOnlySequence<T> and ReadOnlySequenceSegment<T>. Unfortunately, there is no direct implementation we can use for ReadOnlySequenceSegment<T>, see in the documentation how create one such BufferSegment. However, the abstractions provided by .NET BCL forces us to derive from ReadOnlySequenceSegment<T> which results a less optimal implementation in certain corners due to casting. Hence, I have implemented the very same concepts of ReadOnlySequenceSegment<T>, but without deriving from it.

The benefit of a linked list of buffer items, is that expanding the list is cheap: the already written segments require no copy operation. However, the implementation is more complex, as it has to deal with a list of buffers to determine the length of all data in the buffer. It also incurs a copy of all the buffers, when the user requires to receive a single continuous memory of data after everything has been written to the buffer. In case a continuous memory is not required, using the built-in Pipe could become reasonable alternative.

A sample implementation of ChainedArrayBufferWriter<T> may be found on GitHub.

Performance Comparison

After these examples, one could be curious how these designs perform compared to each other. These benchmarks focus on a single aspect of using IBufferWriter<T>s. It uses protobuf-net to serialize an object into the buffer, then it requests IBufferWriter<T> to return all the written data in a single continuous memory segment (an array). I used BenchmarkDotNet to perform the benchmarks. Each buffer writer has been evaluated with a sample object size of: 358, 132358, 264358, 396358, 528358, 2112358, 4224358 bytes, respectively. Then benchmarks focus on the mean execution time and the allocated memory. The results are ordered based on the size of object being serialized, and grouped by the implementation of IBufferWriter<T>.

|             Method |           Mean |        Error |        StdDev |      Gen0 |      Gen1 |      Gen2 |  Allocated |
|------------------- |---------------:|-------------:|--------------:|----------:|----------:|----------:|-----------:|
|  PooledArrayBuffer |       361.4 ns |      5.84 ns |       5.47 ns |         - |         - |         - |          - |
|  PooledArrayBuffer |    39,853.4 ns |    355.29 ns |     314.95 ns |         - |         - |         - |          - |
|  PooledArrayBuffer |    78,841.6 ns |    732.90 ns |     685.55 ns |         - |         - |         - |          - |
|  PooledArrayBuffer |   113,260.2 ns |  1,030.14 ns |     913.19 ns |         - |         - |         - |          - |
|  PooledArrayBuffer |   164,544.2 ns |  1,414.23 ns |   1,322.88 ns |         - |         - |         - |          - |
|  PooledArrayBuffer |   676,185.8 ns |  7,965.81 ns |   7,451.23 ns |         - |         - |         - |        1 B |
|  PooledArrayBuffer | 1,425,548.9 ns | 14,450.31 ns |  12,809.82 ns |         - |         - |         - |        2 B |
| ChainedArrayBuffer |       384.0 ns |      2.80 ns |       2.62 ns |    0.0062 |         - |         - |       40 B |
| ChainedArrayBuffer |    37,598.7 ns |    294.54 ns |     261.11 ns |    0.0610 |         - |         - |      680 B |
| ChainedArrayBuffer |    77,457.7 ns |  1,037.65 ns |     919.85 ns |    0.1221 |         - |         - |     1320 B |
| ChainedArrayBuffer |   113,977.4 ns |  1,004.22 ns |     890.22 ns |    0.2441 |         - |         - |     1960 B |
| ChainedArrayBuffer |   152,751.8 ns |  1,871.77 ns |   1,750.85 ns |    0.2441 |         - |         - |     2600 B |
| ChainedArrayBuffer |   830,330.1 ns | 14,205.19 ns |  13,287.54 ns |  220.7031 |  166.9922 |    6.8359 |  1341413 B |
| ChainedArrayBuffer | 1,827,050.1 ns | 33,722.53 ns |  31,544.08 ns |  554.6875 |  488.2813 |    3.9063 |  3471420 B |
|  ArrayBufferWriter |       394.5 ns |      3.13 ns |       2.45 ns |    0.1721 |    0.0005 |         - |     1080 B |
|  ArrayBufferWriter |   156,500.7 ns |  1,867.37 ns |   1,559.34 ns |  124.7559 |  124.7559 |  124.7559 |   523553 B |
|  ArrayBufferWriter |   313,453.6 ns |  5,335.13 ns |   4,729.45 ns |  285.6445 |  285.6445 |  285.6445 |  1047920 B |
|  ArrayBufferWriter |   350,806.9 ns |  4,685.24 ns |   4,382.58 ns |  285.6445 |  285.6445 |  285.6445 |  1047920 B |
|  ArrayBufferWriter |   705,190.0 ns |  5,952.46 ns |   5,567.94 ns |  499.0234 |  499.0234 |  499.0234 |  2096591 B |
|  ArrayBufferWriter | 2,442,252.4 ns | 48,314.51 ns | 117,604.18 ns |  867.1875 |  851.5625 |  851.5625 |  8388230 B |
|  ArrayBufferWriter | 4,973,964.9 ns | 98,848.09 ns | 272,256.35 ns | 1007.8125 |  992.1875 |  992.1875 | 16776886 B |
|       MemoryStream |       352.7 ns |      3.26 ns |       2.89 ns |    0.0710 |         - |         - |      448 B |
|       MemoryStream |   148,912.9 ns |  2,709.65 ns |   2,402.03 ns |  117.4316 |  117.4316 |  117.4316 |   522103 B |
|       MemoryStream |   364,776.3 ns |  3,860.01 ns |   3,610.66 ns |  249.5117 |  249.5117 |  249.5117 |  1044968 B |
|       MemoryStream |   434,665.2 ns |  3,650.78 ns |   3,236.32 ns |  249.5117 |  249.5117 |  249.5117 |  1044967 B |
|       MemoryStream |   747,143.2 ns |  7,508.13 ns |   7,023.11 ns |  499.0234 |  499.0234 |  499.0234 |  2090664 B |
|       MemoryStream | 2,139,474.9 ns | 66,161.07 ns | 195,077.41 ns | 1339.8438 | 1339.8438 | 1339.8438 |  8364251 B |
|       MemoryStream | 5,242,795.6 ns | 38,359.31 ns |  32,031.76 ns | 1007.8125 |  992.1875 |  992.1875 | 16728123 B |

These results show interesting aspects for the given test workloads. First, it shows that using PooledArrayBuffer<T> is generally a reasonable approach for both throughput and allocation's point of view. Both ArrayBufferWriter<T> and MemoryStream are viable solution on smaller input objects, but for larger ones the allocations increase significantly. ChainedArrayBuffer<T> allocates some memory, because the chained buffer objects are allocated on the heap. In performance it is running close to PooledArrayBuffer<T>. However, for input objects of 2112358, 4224358 bytes, the allocated memory explodes. As it turns out, my implementation is using the Shared ArrayPool<T> to rent buffers, which has a limit on the maximum number of array instances it can hold. To finetune such an implementation, one can create a custom array pool by ArrayPool<T>.Create(maxArrayLength, maxArraysPerBucket) and specify a custom maxArraysPerBucket value. However, such finetuning requires a good understanding of the input data and the type of usage of IBufferWriter<T>s.

using System.Buffers;
using Microsoft.Extensions.ObjectPool;
public class ChainedPolicy : IPooledObjectPolicy<ChainedArrayBufferWriter<byte>>
{
public ChainedArrayBufferWriter<byte> Create()
{
return new ChainedArrayBufferWriter<byte>();
}
public bool Return(ChainedArrayBufferWriter<byte> obj)
{
obj.Clear();
return true;
}
}
internal class MemorySegment<T>
{
private T[] _buffer;
private int _index;
public MemorySegment(T[] buffer)
{
_buffer = buffer;
_index = 0;
RunningIndex = 0;
NextSegment = null;
}
internal int RunningIndex { get; private set; }
internal T[] Buffer => _buffer;
internal int Index => _index;
internal int FreeCapacity => _buffer.Length - _index;
internal Memory<T> WritableMemory => _buffer.AsMemory(_index);
internal Span<T> WritableSpan => _buffer.AsSpan(_index);
internal ReadOnlySpan<T> Span => _buffer.AsSpan(0, _index);
internal MemorySegment<T>? NextSegment { get; private set; }
public void Advance(int count)
{
if (count < 0)
throw new ArgumentException(null, nameof(count));
if (_index > _buffer.Length - count)
ThrowInvalidOperationException_AdvancedTooFar();
_index += count;
}
public MemorySegment<T> Append(T[] nextSegment)
{
var segment = new MemorySegment<T>(nextSegment)
{
RunningIndex = RunningIndex + _index
};
NextSegment = segment;
return segment;
}
private static void ThrowInvalidOperationException_AdvancedTooFar() => throw new InvalidOperationException();
}
public sealed class ChainedArrayBufferWriter<T> : IBufferWriter<T>, IDisposable
{
private const int DefaultInitialBufferSize = 4096 * 2;
private static MemorySegment<T> Empty = new MemorySegment<T>(Array.Empty<T>());
private MemorySegment<T> _head;
private MemorySegment<T> _tail;
public ChainedArrayBufferWriter()
{
_head = _tail = Empty;
}
public int Length => (int)_tail.RunningIndex + _tail.Index;
public void CopyTo(Span<T> destination)
{
if (Length > destination.Length)
throw new AggregateException(nameof(destination));
var current = _head.NextSegment;
while (current != null)
{
current.Span.CopyTo(destination);
destination = destination.Slice(current.Index);
current = current.NextSegment;
}
}
public T[] ToArray()
{
var destination = new T[Length];
CopyTo(destination.AsSpan());
return destination;
}
public void Clear()
{
var current = _head.NextSegment;
while (current != null)
{
ArrayPool<T>.Shared.Return(current.Buffer);
current = current.NextSegment;
}
_head = _tail = Empty;
}
public void Advance(int count) => _tail.Advance(count);
public Memory<T> GetMemory(int sizeHint = 0)
{
CheckAndExapandBuffer(sizeHint);
return _tail.WritableMemory;
}
public Span<T> GetSpan(int sizeHint = 0)
{
CheckAndExapandBuffer(sizeHint);
return _tail.WritableSpan;
}
private void CheckAndExapandBuffer(int sizeHint)
{
if (sizeHint < 0)
throw new ArgumentException(nameof(sizeHint));
if (sizeHint > _tail.FreeCapacity)
{
sizeHint = Math.Max(sizeHint, DefaultInitialBufferSize);
var temp = ArrayPool<T>.Shared.Rent(sizeHint);
_tail = _tail.Append(temp);
}
}
public void Dispose() => Clear();
}
using System.Buffers;
using Microsoft.Extensions.ObjectPool;
public class Policy : IPooledObjectPolicy<PooledArrayBufferWriter<byte>>
{
public PooledArrayBufferWriter<byte> Create()
{
return new PooledArrayBufferWriter<byte>();
}
public bool Return(PooledArrayBufferWriter<byte> obj)
{
obj.Clear();
return true;
}
}
public sealed class PooledArrayBufferWriter<T> : IBufferWriter<T>
{
private const int DefaultInitialBufferSize = 4096 * 2;
private T[] _buffer;
private int _index;
public PooledArrayBufferWriter()
{
_buffer = ArrayPool<T>.Shared.Rent(DefaultInitialBufferSize);
_index = 0;
}
public ReadOnlyMemory<T> WrittenMemory => _buffer.AsMemory(0, _index);
public ReadOnlySpan<T> WrittenSpan => _buffer.AsSpan(0, _index);
public int WrittenCount => _index;
public int Capacity => _buffer.Length;
public int FreeCapacity => _buffer.Length - _index;
public void Clear()
{
ArrayPool<T>.Shared.Return(_buffer);
_buffer = ArrayPool<T>.Shared.Rent(DefaultInitialBufferSize);
_index = 0;
}
public void Advance(int count)
{
if (count < 0)
throw new ArgumentException(null, nameof(count));
if (_index > _buffer.Length - count)
ThrowInvalidOperationException_AdvancedTooFar();
_index += count;
}
public Memory<T> GetMemory(int sizeHint = 0)
{
CheckAndResizeBuffer(sizeHint);
return _buffer.AsMemory(_index);
}
public Span<T> GetSpan(int sizeHint = 0)
{
CheckAndResizeBuffer(sizeHint);
return _buffer.AsSpan(_index);
}
private void CheckAndResizeBuffer(int sizeHint)
{
if (sizeHint < 0)
throw new ArgumentException(nameof(sizeHint));
if (sizeHint == 0)
{
sizeHint = 1;
}
if (sizeHint > FreeCapacity)
{
int currentLength = _buffer.Length;
int growBy = Math.Max(sizeHint, currentLength);
int newSize = currentLength + growBy;
var temp = ArrayPool<T>.Shared.Rent(newSize);
Array.Copy(_buffer, temp, _index);
ArrayPool<T>.Shared.Return(_buffer);
_buffer = temp;
}
}
private static void ThrowInvalidOperationException_AdvancedTooFar() => throw new InvalidOperationException();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment