Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Asynchronous Utf8JsonReader parser
using System;
using Serilog;
using System.IO;
using System.Threading.Tasks;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Buffers;
using System.Net;
namespace MessageClient
{
class JsonRequest : HttpApiRequest, IDisposable
{
public JsonRequest(Requests.BaseRequest request)
: base(request)
{
}
readonly ref struct JsonNode
{
public readonly JsonTokenType TokenType;
public readonly ReadOnlySpan<byte> Value;
public JsonNode(JsonTokenType tokenType, ReadOnlySpan<byte> value)
{
TokenType = tokenType;
Value = value;
}
}
enum ReadResult
{
Done,
Found,
ReadMore
}
private ReadResult GetProperty(string property, ref Utf8JsonReader reader, out JsonNode node)
{
var utf8PropertyName = DefaultEncoding.GetBytes(property);
bool keyFound = false;
while (reader.Read())
{
if (!keyFound && reader.TokenType == JsonTokenType.StartObject)
{
continue;
}
if (reader.TokenType == JsonTokenType.PropertyName)
{
var propertyName = reader.ValueSpan;
if (propertyName.SequenceEqual(utf8PropertyName))
{
keyFound = true;
}
}
if (keyFound)
{
node = new JsonNode(reader.TokenType, reader.ValueSpan);
return ReadResult.Found;
}
}
node = default;
return ReadResult.ReadMore;
}
struct OurState
{
public JsonReaderState InnerState;
public long BytesConsumed;
public bool GotProperty;
}
/// <returns><c>true</c> to indicate need to read more, <c>false</c> to indicate abort.</returns>
private ReadResult Parse<T>(ReadOnlySequence<byte> source, ref OurState state, string property, ref T result,
JsonSerializerOptions options = null)
{
long consumed = state.BytesConsumed;
source = source.Slice(state.BytesConsumed);
var reader = new Utf8JsonReader(source, false, state.InnerState);
if (state.GotProperty == false)
{
var readResult = GetProperty(property, ref reader, out var success);
state.InnerState = reader.CurrentState;
state.BytesConsumed = consumed + state.InnerState.BytesConsumed;
if (readResult == ReadResult.ReadMore)
{
return readResult;
}
state.GotProperty = true;
}
// Try skipping in a new context so our state isn't affected.
// This tells us if we have the entire payload available to consume.
var skipper = reader;
if (!skipper.TrySkip())
{
return ReadResult.ReadMore;
}
result = JsonSerializer.ReadValue<T>(ref reader, options);
state.GotProperty = false;
state.InnerState = reader.CurrentState;
state.BytesConsumed = consumed + state.InnerState.BytesConsumed;
return ReadResult.Done;
}
private async Task<R> ParseStreamAsync<R>(Stream jsonStream)
{
var readerState = new JsonReaderState(new JsonReaderOptions()
{
AllowTrailingCommas = true,
CommentHandling = JsonCommentHandling.Skip,
//MaxDepth = 1
});
var state = new OurState
{
GotProperty = false,
InnerState = readerState
};
bool? success = null;
using (var adapter = new StreamSequence(jsonStream))
{
while (await adapter.ReadMoreAsync())
{
if (success == null)
{
bool value = false;
switch (Parse<bool>(adapter.Sequence, ref state, "success", ref value))
{
case ReadResult.ReadMore:
continue;
case ReadResult.Done:
success = value;
break;
case ReadResult.Found:
throw new Exception("Unexpected value!");
}
}
if (!success.Value)
{
string error = null;
switch (Parse<string>(adapter.Sequence, ref state, "payload", ref error))
{
case ReadResult.ReadMore:
continue;
case ReadResult.Done:
throw new ApiException(ApiCode.GenericError, error);
case ReadResult.Found:
throw new Exception("Unexpected value!");
}
}
R payload = default;
switch (Parse<R>(adapter.Sequence, ref state, "payload", ref payload))
{
case ReadResult.ReadMore:
continue;
case ReadResult.Done:
return payload;
case ReadResult.Found:
throw new Exception("Unexpected value!");
}
}
}
throw new Exception("Did not receive a valid response from the server!");
}
protected override async Task RequestHandlerAsync(HttpWebRequest request)
{
using (var requestStream = await request.GetRequestStreamAsync())
{
await JsonSerializer.WriteAsync(_request, _request.GetType(), requestStream);
}
}
protected override async Task ResponseHandlerAsync<R>(HttpWebResponse response, Func<R, Task> resultHandlerAsync)
{
R result = default;
using (var responseStream = response.GetResponseStream())
{
try
{
result = await ParseStreamAsync<R>(responseStream);
}
catch (JsonException ex)
{
Log.Error(ex, "Error deserializing response as JSON");
throw new ApiException(ApiCode.ServerError, ex.Message);
}
await resultHandlerAsync(result);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment