Last active
March 30, 2020 03:31
-
-
Save wipiano/9a4b6dddc0be4e8386933882d401814e to your computer and use it in GitHub Desktop.
lazy msgpack deserializer to read large object array
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class MessagePackSerializerEx | |
{ | |
public static IEnumerable<T> LazyDeserializeArray<T>(ReadOnlyMemory<byte> bytes, | |
MessagePackSerializerOptions? options = null) | |
{ | |
options ??= MessagePackSerializer.DefaultOptions; | |
if (options.Compression != MessagePackCompression.None) | |
{ | |
throw new InvalidOperationException("lz4 圧縮には対応できません"); | |
} | |
var formatter = options.Resolver.GetFormatterWithVerify<T>(); | |
var length = ReadArrayHeader(); | |
for (var i = 0; i < length; i++) | |
{ | |
yield return DeserializeItem(); | |
} | |
int ReadArrayHeader() | |
{ | |
var reader = new MessagePackReader(bytes); | |
var length = reader.ReadArrayHeader(); | |
bytes = bytes.Slice((int) reader.Consumed); | |
return length; | |
} | |
T DeserializeItem() | |
{ | |
var reader = new MessagePackReader(bytes); | |
var item = formatter.Deserialize(ref reader, options); | |
bytes = bytes.Slice((int) reader.Consumed); | |
return item; | |
} | |
} | |
public static IEnumerable<T> LazyDeserializeArray<T>(ReadOnlySequence<byte> bytes, | |
MessagePackSerializerOptions? options = null) | |
{ | |
options ??= MessagePackSerializer.DefaultOptions; | |
if (options.Compression != MessagePackCompression.None) | |
{ | |
throw new InvalidOperationException("lz4 圧縮には対応できません"); | |
} | |
var formatter = options.Resolver.GetFormatterWithVerify<T>(); | |
var length = ReadArrayHeader(); | |
for (var i = 0; i < length; i++) | |
{ | |
yield return DeserializeItem(); | |
} | |
int ReadArrayHeader() | |
{ | |
var reader = new MessagePackReader(bytes); | |
var length = reader.ReadArrayHeader(); | |
bytes = bytes.Slice((int) reader.Consumed); | |
return length; | |
} | |
T DeserializeItem() | |
{ | |
var reader = new MessagePackReader(bytes); | |
var item = formatter.Deserialize(ref reader, options); | |
bytes = bytes.Slice((int) reader.Consumed); | |
return item; | |
} | |
} | |
public static async IAsyncEnumerable<T> LazyDeserializeArrayAsync<T>(Stream stream, | |
MessagePackSerializerOptions? options = null) | |
{ | |
var pipe = new Pipe(); | |
try | |
{ | |
var writer = pipe.Writer; | |
const int minimumBufferSize = 512; | |
while (true) | |
{ | |
// Allocate at least 512 bytes from the PipeWriter | |
Memory<byte> memory = writer.GetMemory(minimumBufferSize); | |
int bytesRead = await stream.ReadAsync(memory); | |
if (bytesRead == 0) | |
{ | |
break; | |
} | |
// Tell the PipeWriter how much was read from the Stream | |
writer.Advance(bytesRead); | |
} | |
// Tell the PipeReader that there's no more data coming | |
writer.Complete(); | |
var sequence = (await pipe.Reader.ReadAsync()).Buffer; | |
foreach (var item in LazyDeserializeArray<T>(sequence, options)) | |
{ | |
yield return item; | |
} | |
} | |
finally | |
{ | |
pipe.Writer.Complete(); | |
pipe.Reader.Complete(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment