Skip to content

Instantly share code, notes, and snippets.

@keimpema
Last active October 16, 2017 13:58
Show Gist options
  • Save keimpema/8552968b2da9c4bca2dc077a9e03d1cf to your computer and use it in GitHub Desktop.
Save keimpema/8552968b2da9c4bca2dc077a9e03d1cf to your computer and use it in GitHub Desktop.
PipeStream
using System;
using System.IO;
namespace Usenet.Util
{
public abstract class AbstractBaseStream : Stream
{
public override void Flush() {}
public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
public override void SetLength(long value) => throw new NotImplementedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException();
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
}
}
using System;
using System.Collections.Concurrent;
using System.Threading;
using Microsoft.Extensions.Logging;
namespace Usenet.Util
{
public class PipeStream : AbstractBaseStream
{
private class Chunk
{
public byte[] Data { get; set; }
public int Offset { get; set; }
public int Length => Data.Length;
}
private static readonly ILogger log = Logger.Create<PipeStream>();
private readonly BlockingCollection<Chunk> chunks;
private Chunk currentChunk;
private long length;
private int readTimeout = Timeout.Infinite;
public PipeStream()
{
chunks = new BlockingCollection<Chunk>();
}
public override int Read(byte[] buffer, int offset, int count)
{
Guard.ThrowIfNull(buffer, nameof(buffer));
if (offset < 0 || offset >= buffer.Length)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (count < 0 || offset + count > buffer.Length)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
var total = 0;
while (count > 0)
{
if (currentChunk == null || currentChunk.Offset >= currentChunk.Length)
{
// need a new chunk
log.LogDebug("Taking chunk from pipe");
if (!chunks.TryTake(out currentChunk, ReadTimeout))
{
// no more chunks available
log.LogDebug("Pipe is empty");
return total;
}
log.LogDebug("Took chunk from pipe: {Size} bytes", currentChunk.Length);
}
int copyCount = Math.Min(count, currentChunk.Length - currentChunk.Offset);
log.LogDebug("Reading from current chunk: {Count} bytes", copyCount);
Buffer.BlockCopy(currentChunk.Data, currentChunk.Offset, buffer, offset, copyCount);
currentChunk.Offset += copyCount;
offset += copyCount;
total += copyCount;
count -= copyCount;
length -= copyCount;
}
return total;
}
public override void Write(byte[] buffer, int offset, int count)
{
Guard.ThrowIfNull(buffer, nameof(buffer));
if (offset < 0 || offset >= buffer.Length)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (count < 0 || offset + count > buffer.Length)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
var chunk = new Chunk {Data = new byte[count]};
log.LogDebug("Writing chunk to pipe: {Size} bytes", count);
Buffer.BlockCopy(buffer, offset, chunk.Data, 0, count);
chunks.Add(chunk);
length += count;
}
public override bool CanTimeout => true;
public override int ReadTimeout
{
get => readTimeout;
set => readTimeout = value < 0 ? Timeout.Infinite : value;
}
public override bool CanRead => true;
public override bool CanWrite => true;
public override long Length => length;
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Usenet.Util;
using Xunit;
namespace UsenetTests.Util
{
public class PipeStreamTests
{
[Fact]
public void SingleWriteSingleRead()
{
var count = 10;
byte[] expected = GetBuffer(count);
var actual = new byte[count];
var pipe = new PipeStream();
pipe.Write(expected, 0, expected.Length);
int actualCount = pipe.Read(actual, 0, count);
Assert.Equal(count, actualCount);
Assert.Equal(expected, actual);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void MultipleWritesSingleRead()
{
var count = 10;
byte[] expected = GetBuffer(count);
var actual = new byte[count];
var pipe = new PipeStream();
pipe.Write(expected, 0, 3);
pipe.Write(expected, 3, 3);
pipe.Write(expected, 6, 4);
int actualCount = pipe.Read(actual, 0, 10);
Assert.Equal(count, actualCount);
Assert.Equal(expected, actual);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void MultipleWritesMultipleReads()
{
const int count = 10;
byte[] expected = GetBuffer(count);
var actual = new byte[count];
var pipe = new PipeStream();
pipe.Write(expected, 0, 3);
pipe.Write(expected, 3, 3);
pipe.Write(expected, 6, 4);
int actualCount = pipe.Read(actual, 0, 5);
actualCount += pipe.Read(actual, 5, 5);
Assert.Equal(count, actualCount);
Assert.Equal(expected, actual);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void SingleWriteMultipleReads()
{
const int count = 10;
byte[] expected = GetBuffer(count);
var actual = new byte[count];
var pipe = new PipeStream();
pipe.Write(expected, 0, count);
int actualCount = pipe.Read(actual, 0, 3);
actualCount += pipe.Read(actual, 3, 3);
actualCount += pipe.Read(actual, 6, 4);
Assert.Equal(count, actualCount);
Assert.Equal(expected, actual);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void ReadMoreThanWrittenShouldTimeout()
{
const int count = 10;
byte[] expected = GetBuffer(count);
var actual = new byte[count+1];
var pipe = new PipeStream {ReadTimeout = 0};
pipe.Write(expected, 0, count);
int actualCount = pipe.Read(actual, 0, actual.Length);
Assert.Equal(count, actualCount);
//Assert.Equal(expected, actual);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void ReadShouldWaitForIncommingData()
{
const int count = 10;
byte[] expected = GetBuffer(count);
var actual = new byte[count];
var pipe = new PipeStream();
Task<int> readTask = Task.Run(() => pipe.Read(actual, 0, actual.Length));
pipe.Write(expected, 0, count);
Assert.Equal(count, readTask.Result);
Assert.Equal(expected, actual);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void ReadShouldWaitForMultipleWrites()
{
const int count = 10;
byte[] expected = GetBuffer(count);
var actual = new byte[count];
var pipe = new PipeStream();
Task<int> readTask = Task.Run(() => pipe.Read(actual, 0, actual.Length));
pipe.Write(expected, 0, 3);
pipe.Write(expected, 3, 3);
pipe.Write(expected, 6, 4);
Assert.Equal(count, readTask.Result);
Assert.Equal(expected, actual);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void ReadShouldTimeout()
{
const int count = 10;
var actual = new byte[count];
var pipe = new PipeStream {ReadTimeout = 0};
int actualCount = pipe.Read(actual, 0, actual.Length);
Assert.Equal(0, actualCount);
Assert.Equal(0, pipe.Length);
}
[Fact]
public void NegativeReadTimeoutShouldResultInInfinte()
{
var pipe = new PipeStream { ReadTimeout = -12345 };
Assert.Equal(Timeout.Infinite, pipe.ReadTimeout);
}
[Fact]
public void ZeroReadTimeoutShouldBeSetCorrectly()
{
const int expected = 0;
var pipe = new PipeStream { ReadTimeout = expected };
Assert.Equal(expected, pipe.ReadTimeout);
}
[Fact]
public void PositiveReadTimeoutShouldBeSetCorrectly()
{
const int expected = 1234;
var pipe = new PipeStream { ReadTimeout = expected };
Assert.Equal(expected, pipe.ReadTimeout);
}
private static byte[] GetBuffer(int count)
{
var random = new Random((int)DateTimeOffset.UtcNow.UtcTicks);
var buffer = new byte[count];
random.NextBytes(buffer);
return buffer;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment