Last active
September 3, 2021 17:28
-
-
Save ngbrown/1d8ca57dc3d47258ff6ee39d49f0a6eb to your computer and use it in GitHub Desktop.
Line delimited json de-serializing with .NET 5
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Buffers; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.IO.Pipelines; | |
using System.Text; | |
using System.Text.Json; | |
using System.Threading.Tasks; | |
class Program | |
{ | |
static readonly byte[] NewLineChars = {(byte)'\r', (byte)'\n'}; | |
static readonly byte[] WhiteSpaceChars = {(byte)'\r', (byte)'\n', (byte)' ', (byte)'\t'}; | |
private static async Task Main() | |
{ | |
JsonSerializerOptions jsonOptions = new(JsonSerializerDefaults.Web); | |
var json = "{\"some\":\"thing1\"}\r\n{\"some\":\"thing2\"}\r\n{\"some\":\"thing3\"}"; | |
var contentStream = new MemoryStream(Encoding.UTF8.GetBytes(json)); | |
var pipeReader = PipeReader.Create(contentStream); | |
await foreach (var foo in ReadItemsAsync<Foo>(pipeReader, jsonOptions)) | |
{ | |
Console.WriteLine($"foo: {foo.Some}"); | |
} | |
} | |
static async IAsyncEnumerable<TValue> ReadItemsAsync<TValue>(PipeReader pipeReader, JsonSerializerOptions jsonOptions = null) | |
{ | |
while (true) | |
{ | |
var result = await pipeReader.ReadAsync(); | |
var buffer = result.Buffer; | |
bool isCompleted = result.IsCompleted; | |
SequencePosition bufferPosition = buffer.Start; | |
while (true) | |
{ | |
var(value, advanceSequence) = TryReadNextItem<TValue>(buffer, ref bufferPosition, isCompleted, jsonOptions); | |
if (value != null) | |
{ | |
yield return value; | |
} | |
if (advanceSequence) | |
{ | |
pipeReader.AdvanceTo(bufferPosition, buffer.End); //advance our position in the pipe | |
break; | |
} | |
} | |
if (isCompleted) | |
yield break; | |
} | |
} | |
static (TValue, bool) TryReadNextItem<TValue>(ReadOnlySequence<byte> sequence, ref SequencePosition sequencePosition, bool isCompleted, JsonSerializerOptions jsonOptions) | |
{ | |
var reader = new SequenceReader<byte>(sequence.Slice(sequencePosition)); | |
while (!reader.End) // loop until we've come to the end or read an item | |
{ | |
if (reader.TryReadToAny(out ReadOnlySpan<byte> itemBytes, NewLineChars, advancePastDelimiter: true)) | |
{ | |
sequencePosition = reader.Position; | |
if (itemBytes.TrimStart(WhiteSpaceChars).IsEmpty) | |
{ | |
continue; | |
} | |
return (JsonSerializer.Deserialize<TValue>(itemBytes, jsonOptions), false); | |
} | |
else if (isCompleted) | |
{ | |
// read last item | |
var remainingReader = sequence.Slice(reader.Position); | |
ReadOnlySpan<byte> remainingSpan = remainingReader.IsSingleSegment ? remainingReader.First.Span : remainingReader.ToArray(); | |
reader.Advance(remainingReader.Length); // advance reader to the end | |
sequencePosition = reader.Position; | |
if (!remainingSpan.TrimStart(WhiteSpaceChars).IsEmpty) | |
{ | |
return (JsonSerializer.Deserialize<TValue>(remainingSpan, jsonOptions), true); | |
} | |
else | |
{ | |
return (default, true); | |
} | |
} | |
else | |
{ | |
// no more items in sequence | |
break; | |
} | |
} | |
// PipeReader needs to read more | |
return (default, true); | |
} | |
} | |
public class Foo | |
{ | |
public string Some | |
{ | |
get; | |
set; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment