Skip to content

Instantly share code, notes, and snippets.

@wipiano
Last active March 30, 2020 03:31
Show Gist options
  • Save wipiano/9a4b6dddc0be4e8386933882d401814e to your computer and use it in GitHub Desktop.
Save wipiano/9a4b6dddc0be4e8386933882d401814e to your computer and use it in GitHub Desktop.
lazy msgpack deserializer to read large object array
public static class MessagePackSerializerEx
{
public static IEnumerable<T> LazyDeserializeArray<T>(ReadOnlyMemory<byte> bytes,
MessagePackSerializerOptions? options = null)
{
options ??= MessagePackSerializer.DefaultOptions;
if (options.Compression != MessagePackCompression.None)
{
throw new InvalidOperationException("lz4 圧縮には対応できません");
}
var formatter = options.Resolver.GetFormatterWithVerify<T>();
var length = ReadArrayHeader();
for (var i = 0; i < length; i++)
{
yield return DeserializeItem();
}
int ReadArrayHeader()
{
var reader = new MessagePackReader(bytes);
var length = reader.ReadArrayHeader();
bytes = bytes.Slice((int) reader.Consumed);
return length;
}
T DeserializeItem()
{
var reader = new MessagePackReader(bytes);
var item = formatter.Deserialize(ref reader, options);
bytes = bytes.Slice((int) reader.Consumed);
return item;
}
}
public static IEnumerable<T> LazyDeserializeArray<T>(ReadOnlySequence<byte> bytes,
MessagePackSerializerOptions? options = null)
{
options ??= MessagePackSerializer.DefaultOptions;
if (options.Compression != MessagePackCompression.None)
{
throw new InvalidOperationException("lz4 圧縮には対応できません");
}
var formatter = options.Resolver.GetFormatterWithVerify<T>();
var length = ReadArrayHeader();
for (var i = 0; i < length; i++)
{
yield return DeserializeItem();
}
int ReadArrayHeader()
{
var reader = new MessagePackReader(bytes);
var length = reader.ReadArrayHeader();
bytes = bytes.Slice((int) reader.Consumed);
return length;
}
T DeserializeItem()
{
var reader = new MessagePackReader(bytes);
var item = formatter.Deserialize(ref reader, options);
bytes = bytes.Slice((int) reader.Consumed);
return item;
}
}
public static async IAsyncEnumerable<T> LazyDeserializeArrayAsync<T>(Stream stream,
MessagePackSerializerOptions? options = null)
{
var pipe = new Pipe();
try
{
var writer = pipe.Writer;
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Stream
writer.Advance(bytesRead);
}
// Tell the PipeReader that there's no more data coming
writer.Complete();
var sequence = (await pipe.Reader.ReadAsync()).Buffer;
foreach (var item in LazyDeserializeArray<T>(sequence, options))
{
yield return item;
}
}
finally
{
pipe.Writer.Complete();
pipe.Reader.Complete();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment