Skip to content

Instantly share code, notes, and snippets.

@ngbrown
Last active September 3, 2021 17:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ngbrown/1d8ca57dc3d47258ff6ee39d49f0a6eb to your computer and use it in GitHub Desktop.
Save ngbrown/1d8ca57dc3d47258ff6ee39d49f0a6eb to your computer and use it in GitHub Desktop.
Line delimited json de-serializing with .NET 5
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
class Program
{
static readonly byte[] NewLineChars = {(byte)'\r', (byte)'\n'};
static readonly byte[] WhiteSpaceChars = {(byte)'\r', (byte)'\n', (byte)' ', (byte)'\t'};
private static async Task Main()
{
JsonSerializerOptions jsonOptions = new(JsonSerializerDefaults.Web);
var json = "{\"some\":\"thing1\"}\r\n{\"some\":\"thing2\"}\r\n{\"some\":\"thing3\"}";
var contentStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
var pipeReader = PipeReader.Create(contentStream);
await foreach (var foo in ReadItemsAsync<Foo>(pipeReader, jsonOptions))
{
Console.WriteLine($"foo: {foo.Some}");
}
}
static async IAsyncEnumerable<TValue> ReadItemsAsync<TValue>(PipeReader pipeReader, JsonSerializerOptions jsonOptions = null)
{
while (true)
{
var result = await pipeReader.ReadAsync();
var buffer = result.Buffer;
bool isCompleted = result.IsCompleted;
SequencePosition bufferPosition = buffer.Start;
while (true)
{
var(value, advanceSequence) = TryReadNextItem<TValue>(buffer, ref bufferPosition, isCompleted, jsonOptions);
if (value != null)
{
yield return value;
}
if (advanceSequence)
{
pipeReader.AdvanceTo(bufferPosition, buffer.End); //advance our position in the pipe
break;
}
}
if (isCompleted)
yield break;
}
}
static (TValue, bool) TryReadNextItem<TValue>(ReadOnlySequence<byte> sequence, ref SequencePosition sequencePosition, bool isCompleted, JsonSerializerOptions jsonOptions)
{
var reader = new SequenceReader<byte>(sequence.Slice(sequencePosition));
while (!reader.End) // loop until we've come to the end or read an item
{
if (reader.TryReadToAny(out ReadOnlySpan<byte> itemBytes, NewLineChars, advancePastDelimiter: true))
{
sequencePosition = reader.Position;
if (itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
{
continue;
}
return (JsonSerializer.Deserialize<TValue>(itemBytes, jsonOptions), false);
}
else if (isCompleted)
{
// read last item
var remainingReader = sequence.Slice(reader.Position);
ReadOnlySpan<byte> remainingSpan = remainingReader.IsSingleSegment ? remainingReader.First.Span : remainingReader.ToArray();
reader.Advance(remainingReader.Length); // advance reader to the end
sequencePosition = reader.Position;
if (!remainingSpan.TrimStart(WhiteSpaceChars).IsEmpty)
{
return (JsonSerializer.Deserialize<TValue>(remainingSpan, jsonOptions), true);
}
else
{
return (default, true);
}
}
else
{
// no more items in sequence
break;
}
}
// PipeReader needs to read more
return (default, true);
}
}
public class Foo
{
public string Some
{
get;
set;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment