Skip to content

Instantly share code, notes, and snippets.

@Erwinvandervalk
Created December 8, 2017 09:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Erwinvandervalk/b4bb238f7f093da23c18df31d315610b to your computer and use it in GitHub Desktop.
Save Erwinvandervalk/b4bb238f7f093da23c18df31d315610b to your computer and use it in GitHub Desktop.
Validate json while sending it over http
/// <summary>
/// Allows you to pass an asynchronous action that writes to the output stream.
/// </summary>
public class PushStreamContent : HttpContent
{
private readonly Func<Stream, Task> _writeToStreamAsync;
private readonly int _length;
/// <summary>
/// Creates an instance of the PushStreamContent
/// </summary>
/// <param name="writeToStreamAsync">the push stream content</param>
/// <param name="length">(optional) the length of the stream to write. Will pass -1 if not set.</param>
public PushStreamContent(Func<Stream, Task> writeToStreamAsync, int length = -1)
{
if(writeToStreamAsync == null)
{
throw new ArgumentNullException("actionOfStream");
}
_writeToStreamAsync = writeToStreamAsync;
_length = length;
}
/// <summary>
/// Perform the actual writing to the output stream.
/// </summary>
/// <param name="stream">The output stream</param>
/// <param name="context">transport context (not used)</param>
/// <returns>Async task</returns>
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
return _writeToStreamAsync(stream);
}
/// <summary>
/// Returns the length of the stream (if known)
/// </summary>
/// <param name="length"></param>
/// <returns></returns>
protected override bool TryComputeLength(out long length)
{
// We can't know how much the Action<Stream> is going to write
length = _length;
return false;
}
}
/// <summary>
/// Wrapper that wraps the source stream, and while reading, also writes to the copy-to stream.
/// </summary>
public class SplittingStream : Stream
{
private readonly Stream _sourceStream;
private readonly Stream _copyToStream;
/// <summary>
/// Creates an instance of the Splitting stream.
/// </summary>
/// <param name="sourceStream">The stream containing the sourced data. </param>
/// <param name="copyToStream">The stream that all bytes will be written to.</param>
public SplittingStream(Stream sourceStream, Stream copyToStream)
{
_sourceStream = sourceStream ?? throw new ArgumentNullException(nameof(sourceStream));
_copyToStream = copyToStream ?? throw new ArgumentNullException(nameof(copyToStream));
if(!_sourceStream.CanRead)
{
throw new InvalidOperationException(
$"Can't read from the {nameof(_sourceStream)} stream of type {_sourceStream.GetType()}. It does not allow reading.");
}
if(!_copyToStream.CanWrite)
{
throw new InvalidOperationException(
$"Can't write to the {nameof(copyToStream)} stream of type {copyToStream.GetType()}. It does not allow writing.");
}
}
/// <summary>
/// Flush the data to both streams.
/// </summary>
public override void Flush()
{
_sourceStream.Flush();
_copyToStream.Flush();
}
/// <summary>
/// Read data from the source stream. Any byte read will also be written to the output stream.
/// </summary>
/// <param name="buffer">Buffer to read data into</param>
/// <param name="offset">The offset from the source stream to read</param>
/// <param name="count">Number of bytes to read</param>
/// <returns></returns>
public override int Read(byte[] buffer, int offset, int count)
{
int read = _sourceStream.Read(buffer, offset, count);
_copyToStream.Write(buffer, 0, read);
return read;
}
/// <summary>
/// Seeking is not supported as it would be 'silly'
/// </summary>
/// <param name="offset"></param>
/// <param name="origin"></param>
/// <returns></returns>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException("seeking is not supported");
}
/// <summary>
/// Setting the length is not supported as it would also be silly
/// </summary>
/// <param name="value"></param>
public override void SetLength(long value)
{
throw new NotSupportedException("Setting the length of the source stream is not supported");
}
/// <summary>
/// Writing to this stream would be silly.. would you write to it and then
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Writing to the source stream is not supported.");
}
/// <summary>
/// Can read (Should be true)
/// </summary>
public override bool CanRead => _sourceStream.CanRead;
/// <summary>
/// REturns false as seeking is not allowed
/// </summary>
public override bool CanSeek => false;
/// <summary>
/// REturns false as writing is not allowed
/// </summary>
public override bool CanWrite => false;
/// <summary>
/// REturns the length of the source stream
/// </summary>
public override long Length => _sourceStream.Length;
/// <summary>
/// REturns the position in the stream. Setting position is not allowed.
/// </summary>
public override long Position
{
get => _sourceStream.Position;
set => throw new NotSupportedException("Setting the position is not supported");
}
}
// Create an upload request that will send the stream and validate it immediately.
var uploadRequest = new HttpRequestMessage(HttpMethod.Put, uploadUri)
{
Content = new PushStreamContent(async (s) =>
{
var splittingStream = new SplittingStream(request.InputStream, s);
// Validating the json by reading through the stream. Each byte read by the validator
// will be sent to the output stream.
await ValidateJson(splittingStream, cancellationToken);
})
};
// now send it.
var response = await new HttpClient().SendAsync(uploadRequest, cancellationToken);
response.EnsureSuccessStatusCode();
// Validate the json by reading to end.
private async Task ValidateJson(Stream stream, string id, CancellationToken ct)
{
var reader = new JsonTextReader(new StreamReader(stream));
while(await reader.ReadAsync(ct))
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment