Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Validate json while sending it over http
/// <summary>
/// Allows you to pass an asynchronous action that writes to the output stream.
/// </summary>
public class PushStreamContent : HttpContent
{
private readonly Func<Stream, Task> _writeToStreamAsync;
private readonly int _length;
/// <summary>
/// Creates an instance of the PushStreamContent
/// </summary>
/// <param name="writeToStreamAsync">the push stream content</param>
/// <param name="length">(optional) the length of the stream to write. Will pass -1 if not set.</param>
public PushStreamContent(Func<Stream, Task> writeToStreamAsync, int length = -1)
{
if(writeToStreamAsync == null)
{
throw new ArgumentNullException("actionOfStream");
}
_writeToStreamAsync = writeToStreamAsync;
_length = length;
}
/// <summary>
/// Perform the actual writing to the output stream.
/// </summary>
/// <param name="stream">The output stream</param>
/// <param name="context">transport context (not used)</param>
/// <returns>Async task</returns>
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
return _writeToStreamAsync(stream);
}
/// <summary>
/// Returns the length of the stream (if known)
/// </summary>
/// <param name="length"></param>
/// <returns></returns>
protected override bool TryComputeLength(out long length)
{
// We can't know how much the Action<Stream> is going to write
length = _length;
return false;
}
}
/// <summary>
/// Wrapper that wraps the source stream, and while reading, also writes to the copy-to stream.
/// </summary>
public class SplittingStream : Stream
{
private readonly Stream _sourceStream;
private readonly Stream _copyToStream;
/// <summary>
/// Creates an instance of the Splitting stream.
/// </summary>
/// <param name="sourceStream">The stream containing the sourced data. </param>
/// <param name="copyToStream">The stream that all bytes will be written to.</param>
public SplittingStream(Stream sourceStream, Stream copyToStream)
{
_sourceStream = sourceStream ?? throw new ArgumentNullException(nameof(sourceStream));
_copyToStream = copyToStream ?? throw new ArgumentNullException(nameof(copyToStream));
if(!_sourceStream.CanRead)
{
throw new InvalidOperationException(
$"Can't read from the {nameof(_sourceStream)} stream of type {_sourceStream.GetType()}. It does not allow reading.");
}
if(!_copyToStream.CanWrite)
{
throw new InvalidOperationException(
$"Can't write to the {nameof(copyToStream)} stream of type {copyToStream.GetType()}. It does not allow writing.");
}
}
/// <summary>
/// Flush the data to both streams.
/// </summary>
public override void Flush()
{
_sourceStream.Flush();
_copyToStream.Flush();
}
/// <summary>
/// Read data from the source stream. Any byte read will also be written to the output stream.
/// </summary>
/// <param name="buffer">Buffer to read data into</param>
/// <param name="offset">The offset from the source stream to read</param>
/// <param name="count">Number of bytes to read</param>
/// <returns></returns>
public override int Read(byte[] buffer, int offset, int count)
{
int read = _sourceStream.Read(buffer, offset, count);
_copyToStream.Write(buffer, 0, read);
return read;
}
/// <summary>
/// Seeking is not supported as it would be 'silly'
/// </summary>
/// <param name="offset"></param>
/// <param name="origin"></param>
/// <returns></returns>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException("seeking is not supported");
}
/// <summary>
/// Setting the length is not supported as it would also be silly
/// </summary>
/// <param name="value"></param>
public override void SetLength(long value)
{
throw new NotSupportedException("Setting the length of the source stream is not supported");
}
/// <summary>
/// Writing to this stream would be silly.. would you write to it and then
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Writing to the source stream is not supported.");
}
/// <summary>
/// Can read (Should be true)
/// </summary>
public override bool CanRead => _sourceStream.CanRead;
/// <summary>
/// REturns false as seeking is not allowed
/// </summary>
public override bool CanSeek => false;
/// <summary>
/// REturns false as writing is not allowed
/// </summary>
public override bool CanWrite => false;
/// <summary>
/// REturns the length of the source stream
/// </summary>
public override long Length => _sourceStream.Length;
/// <summary>
/// REturns the position in the stream. Setting position is not allowed.
/// </summary>
public override long Position
{
get => _sourceStream.Position;
set => throw new NotSupportedException("Setting the position is not supported");
}
}
// Create an upload request that will send the stream and validate it immediately.
var uploadRequest = new HttpRequestMessage(HttpMethod.Put, uploadUri)
{
Content = new PushStreamContent(async (s) =>
{
var splittingStream = new SplittingStream(request.InputStream, s);
// Validating the json by reading through the stream. Each byte read by the validator
// will be sent to the output stream.
await ValidateJson(splittingStream, cancellationToken);
})
};
// now send it.
var response = await new HttpClient().SendAsync(uploadRequest, cancellationToken);
response.EnsureSuccessStatusCode();
// Validate the json by reading to end.
private async Task ValidateJson(Stream stream, string id, CancellationToken ct)
{
var reader = new JsonTextReader(new StreamReader(stream));
while(await reader.ReadAsync(ct))
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.