Skip to content

Instantly share code, notes, and snippets.

@0x53A
Created July 27, 2013 09:13
Show Gist options
  • Save 0x53A/6094342 to your computer and use it in GitHub Desktop.
Save 0x53A/6094342 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace xxx
{
public class StreamPipeline : IDisposable
{
private Action<Stream, Stream>[] _filters;
private List<BlockingStream> _blockingStreams;
public StreamPipeline(params Action<Stream, Stream>[] filters)
{
if (filters == null) throw new ArgumentNullException("filters");
if (filters.Length == 0 || Array.IndexOf(filters, null) >= 0)
throw new ArgumentException("filters");
_filters = filters;
_blockingStreams = new List<BlockingStream>(_filters.Length - 1);
for (int i = 0; i < filters.Length - 1; i++)
{
_blockingStreams.Add(new BlockingStream());
}
}
public void Run(Stream input, Stream output)
{
if (_blockingStreams == null)
throw new ObjectDisposedException(GetType().Name);
if (input == null) throw new ArgumentNullException("input");
if (!input.CanRead) throw new ArgumentException("input");
if (output == null) throw new ArgumentNullException("output");
if (!output.CanWrite) throw new ArgumentException("output");
ThreadStart lastStage = null;
for (int i = 0; i < _filters.Length; i++)
{
Stream stageInput = i == 0 ? input : _blockingStreams[i - 1];
Stream stageOutput =
i == _filters.Length - 1 ? output : _blockingStreams[i];
Action<Stream, Stream> filter = _filters[i];
ThreadStart stage = delegate
{
filter(stageInput, stageOutput);
if (stageOutput is BlockingStream)
((BlockingStream)stageOutput).SetEndOfStream();
};
if (i < _filters.Length - 1)
{
Thread t = new Thread(stage);
t.IsBackground = true;
t.Start();
}
else lastStage = stage;
}
lastStage();
}
public void Dispose()
{
if (_blockingStreams != null)
{
foreach (BlockingStream stream in _blockingStreams)
stream.Dispose();
_blockingStreams = null;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment