Skip to content

Instantly share code, notes, and snippets.

@hifi
Last active January 4, 2017 18:09
Show Gist options
  • Save hifi/3882f07f9064ce87e1a43d27a83888b3 to your computer and use it in GitHub Desktop.
Save hifi/3882f07f9064ce87e1a43d27a83888b3 to your computer and use it in GitHub Desktop.
An idea for stream splitting
using System;
using System.IO;
using System.Threading;
namespace Renci.SshNet.Common
{
internal class Pipe
{
class PipeEntry
{
private byte[] _data;
private int _position;
private int _length;
public PipeEntry Next { get; set; }
public PipeEntry(byte[] data, int offset, int count)
{
_data = data;
_position = offset;
_length = count;
}
public int Read(byte[] dst, int offset, int count)
{
var bytesToCopy = count;
var bytesAvailable = _length - _position;
if (count > bytesAvailable)
bytesToCopy = bytesAvailable;
Buffer.BlockCopy(_data, _position, dst, offset, bytesToCopy);
_position += bytesToCopy;
return bytesToCopy;
}
public bool IsEmpty
{
get { return _position == _length; }
}
}
class ByteQueue
{
private readonly object _lock = new object();
private bool _isClosed;
private PipeEntry First { get; set; }
private PipeEntry Last { get; set; }
public void Close()
{
lock (_lock)
{
_isClosed = true;
Monitor.PulseAll(_lock);
}
}
public void Enqueue(byte[] buffer, int offset, int count)
{
lock (_lock)
{
if (_isClosed)
return;
var entry = new PipeEntry(buffer, offset, count);
if (Last != null)
{
Last.Next = entry;
}
Last = entry;
if (First == null)
{
First = entry;
}
Monitor.PulseAll(_lock);
}
}
public int Dequeue(byte[] buffer, int offset, int count)
{
lock (_lock)
{
var totalBytesRead = 0;
while (count > 0)
{
while (First == null && !_isClosed)
Monitor.Wait(_lock);
if (First == null)
{
return totalBytesRead;
}
var bytesRead = First.Read(buffer, offset, count);
if (First.IsEmpty)
{
First = First.Next;
}
count -= bytesRead;
totalBytesRead += bytesRead;
offset += bytesRead;
}
return totalBytesRead;
}
}
}
class PipeStream : Stream
{
public enum Type
{
Read,
Write
};
private ByteQueue _queue;
private bool _isReadable;
private bool _isWritable;
private bool _isDisposed;
private bool _isClosed;
public PipeStream(ByteQueue queue, Type type)
{
_queue = queue;
if (type == Type.Read)
{
_isReadable = true;
}
else
{
_isWritable = true;
}
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset + count > buffer.Length)
throw new ArgumentException("The sum of offset and count is greater than the buffer length.");
if (offset < 0 || count < 0)
throw new ArgumentOutOfRangeException("offset", "offset or count is negative.");
if (_isDisposed)
throw CreateObjectDisposedException();
if (!_isReadable)
throw new NotSupportedException("This stream is for writing only.");
if (_isClosed)
return 0;
return _queue.Dequeue(buffer, offset, count);
}
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset + count > buffer.Length)
throw new ArgumentException("The sum of offset and count is greater than the buffer length.");
if (offset < 0 || count < 0)
throw new ArgumentOutOfRangeException("offset", "offset or count is negative.");
if (_isDisposed)
throw CreateObjectDisposedException();
if (!_isWritable)
throw new NotSupportedException("This pipe is for reading only.");
if (_isClosed)
return;
_queue.Enqueue(buffer, offset, count);
}
public override bool CanRead
{
get { return _isReadable; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return _isWritable; }
}
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override void Close()
{
if (!_isClosed && _isWritable)
{
_queue.Close();
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (!_isDisposed)
{
if (!_isClosed)
Close();
_isDisposed = true;
}
}
private ObjectDisposedException CreateObjectDisposedException()
{
return new ObjectDisposedException(GetType().FullName);
}
}
public Stream InputStream
{
get; private set;
}
public Stream OutputStream
{
get; private set;
}
public Pipe()
{
var queue = new ByteQueue();
InputStream = new PipeStream(queue, PipeStream.Type.Write);
OutputStream = new PipeStream(queue, PipeStream.Type.Read);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment