Skip to content

Instantly share code, notes, and snippets.

@hanishi
Last active May 17, 2022 00:50
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save hanishi/7139122 to your computer and use it in GitHub Desktop.
Save hanishi/7139122 to your computer and use it in GitHub Desktop.
PipeStream with Rx that works!
using System;
using System.IO;
using System.IO.Pipes;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Security.AccessControl;
using System.Security.Principal;
using System.Threading;
namespace YourNameSpace
{
public static class PipeStreamObservable
{
private static readonly PipeSecurity PipeSecurity;
private static readonly ThreadLocal<BinaryFormatter> BinaryFormatter
= new ThreadLocal<BinaryFormatter>(() => new BinaryFormatter());
static PipeStreamObservable()
{
PipeSecurity = new PipeSecurity();
PipeSecurity.AddAccessRule(
new PipeAccessRule(WindowsIdentity.GetCurrent().User, PipeAccessRights.FullControl, AccessControlType.Allow)
);
PipeSecurity.AddAccessRule(
new PipeAccessRule(
new SecurityIdentifier(WellKnownSidType.WorldSid, null), PipeAccessRights.ReadWrite,
AccessControlType.Allow
)
);
}
public static void Write<T>(this PipeStream stream, T type)
{
BinaryFormatter.Value.Serialize(stream, type);
}
public static IObservable<T> Create<T>(out NamedPipeServerStream stream, string pipeName,
EventHandler onConnected = null)
{
stream = new NamedPipeServerStream(pipeName, PipeDirection.InOut,
-1, PipeTransmissionMode.Byte,
PipeOptions.Asynchronous, 4096, 4096, PipeSecurity);
var serverStream = stream;
return Observable.Using(() => serverStream, resource => Observable.Create<T>(o =>
{
var currentStateSubscription = new SerialDisposable();
return NewThreadScheduler.Default.Schedule(
new FormatterIterator<T>(resource, new BinaryFormatter(), onConnected), (state, self) =>
{
var observable = state.ReadNext();
if (observable != null)
currentStateSubscription.Disposable =
observable.Subscribe(
str =>
{
self(state);
o.OnNext(str);
},
ex =>
{
currentStateSubscription
.Dispose();
o.OnError(ex);
resource.Close();
},
() =>
{
currentStateSubscription
.Dispose();
o.OnCompleted();
resource.Close();
});
else
o.OnError(new Exception("Scheduling aborted."));
}
);
}));
}
public static IObservable<T> Create<T>(out NamedPipeClientStream stream,
string server,
string pipeName,
EventHandler onConnected = null)
{
stream = new NamedPipeClientStream(server, pipeName, PipeDirection.InOut,
PipeOptions.Asynchronous);
var clientStream = stream;
return Observable.Using(() => clientStream, resource => Observable.Create<T>(o =>
{
var currentStateSubscription = new SerialDisposable();
return NewThreadScheduler.Default.Schedule(
new FormatterIterator<T>(resource, new BinaryFormatter(), onConnected), (state, self) =>
{
var observable = state.ReadNext();
if (observable != null)
currentStateSubscription.Disposable =
observable.Subscribe(
str =>
{
self(state);
o.OnNext(str);
},
ex =>
{
currentStateSubscription
.Dispose();
o.OnError(ex);
resource.Close();
},
() =>
{
currentStateSubscription
.Dispose();
o.OnCompleted();
resource.Close();
});
else
o.OnError(new Exception("Scheduling aborted."));
}
);
}));
}
private delegate bool StreamHandler<T>(Stream input, out T output);
public interface IIterator<out T>
{
IObservable<T> ReadNext();
}
private abstract class ReaderState<T> : IIterator<T>
{
public abstract IObservable<T> ReadNext();
}
private class ReadReadyState<T> : ReaderState<T>
{
private readonly PipeStream _stream;
private readonly StreamHandler<T> _handler;
internal ReadReadyState(PipeStream stream, StreamHandler<T> handler)
{
_stream = stream;
_handler = handler;
}
public override IObservable<T> ReadNext()
{
return Observable.Create<T>(o =>
{
try
{
if (_stream.IsConnected)
{
T value;
if (_handler(_stream, out value))
o.OnNext(value);
else
o.OnCompleted();
}
}
catch (Exception e)
{
o.OnError(e);
}
return Disposable.Empty;
});
}
}
private class ServerStreamReader<T> : IIterator<T>
{
private ReaderState<T> _currentState;
private readonly NamedPipeServerStream _stream;
private readonly StreamHandler<T> _handler;
internal ServerStreamReader(NamedPipeServerStream stream, StreamHandler<T> handler, EventHandler onConnected)
{
_stream = stream;
_handler = handler;
_currentState = new ConnectionWaitState<T>(this, onConnected);
}
private class ConnectionWaitState<T1> : ReaderState<T1>
{
private readonly ServerStreamReader<T1> _parent;
private readonly EventHandler _onConnected;
internal ConnectionWaitState(ServerStreamReader<T1> parent, EventHandler onConnected)
{
_parent = parent;
_onConnected += onConnected;
}
public override IObservable<T1> ReadNext()
{
try
{
_parent._stream.WaitForConnection();
if (_onConnected != null)
_onConnected(this, EventArgs.Empty);
_parent._currentState = new ReadReadyState<T1>(_parent._stream, _parent._handler);
return _parent._currentState.ReadNext();
}
catch (Exception)
{
}
return null;
}
}
public IObservable<T> ReadNext()
{
return _currentState.ReadNext();
}
}
private class ClientStreamReader<T> : IIterator<T>
{
private ReaderState<T> _currentState;
private readonly NamedPipeClientStream _stream;
private readonly StreamHandler<T> _handler;
internal ClientStreamReader(NamedPipeClientStream stream, StreamHandler<T> handler, EventHandler onConnected)
{
_stream = stream;
_handler = handler;
_currentState = new ConnectionWaitState<T>(this, onConnected);
}
private class ConnectionWaitState<T1> : ReaderState<T1>
{
private readonly ClientStreamReader<T1> _parent;
private readonly EventHandler _onConnected;
internal ConnectionWaitState(ClientStreamReader<T1> parent, EventHandler onConnected)
{
_parent = parent;
_onConnected += onConnected;
}
public override IObservable<T1> ReadNext()
{
try
{
_parent._stream.Connect(5000);
if (_onConnected != null)
_onConnected(this, EventArgs.Empty);
_parent._currentState = new ReadReadyState<T1>(_parent._stream, _parent._handler);
return _parent._currentState.ReadNext();
}
catch (Exception)
{
}
return null;
}
}
public IObservable<T> ReadNext()
{
return _currentState.ReadNext();
}
}
private sealed class FormatterIterator<T> : IIterator<T>
{
private readonly IIterator<T> _iterator;
private readonly IFormatter _formatter;
public FormatterIterator(NamedPipeServerStream source, IFormatter formatter, EventHandler onConnected)
{
_iterator = new ServerStreamReader<T>(source, DeserializeWithFormatter, onConnected);
_formatter = formatter;
}
public FormatterIterator(NamedPipeClientStream source, IFormatter formatter, EventHandler onConnected)
{
_iterator = new ClientStreamReader<T>(source, DeserializeWithFormatter, onConnected);
_formatter = formatter;
}
public IObservable<T> ReadNext()
{
return _iterator.ReadNext();
}
private bool DeserializeWithFormatter(Stream stream, out T value)
{
try
{
value = (T) _formatter.Deserialize(stream);
return true;
}
catch (Exception)
{
value = default(T);
return false;
}
}
}
}
}
namespace YourNameSpace
{
public interface IPipeStreamObserver<in T> : IObserver<T>
{
PipeStream PipeStream { get; set; }
void OnConnected();
}
public class IpcClient<T>
{
public IPipeStreamObserver<T> Observer { get; set; }
private readonly AutoResetEvent _auto = new AutoResetEvent(false);
private readonly string _server;
private readonly string _name;
public IpcClient(string server, string name, IPipeStreamObserver<T> observer)
{
Observer = observer;
_server = server;
_name = name;
}
public IDisposable Create()
{
NamedPipeClientStream pipe;
var observable = PipeStreamObservable.Create<T>(out pipe, _server,_name, (sender, args) => _auto.Set());
Observer.PipeStream = pipe;
observable.Subscribe(
Observer.OnNext,
ex =>
{
Observer.OnError(ex);
pipe.Close();
_auto.Set();
},
() =>
{
Observer.OnCompleted();
pipe.Close();
});
_auto.WaitOne();
if (pipe.IsConnected)
Observer.OnConnected();
return pipe;
}
}
public class IpcServer<T>
{
private bool _running;
private readonly AutoResetEvent _auto = new AutoResetEvent(false);
private readonly List<PipeStream> _pipes = new List<PipeStream>();
private readonly string _name;
private Task _task;
public IpcServer(string name)
{
_name = name;
_task = Task.Factory.StartNew(() => { });
}
public void IpcServerPipeCreate<TPipeStreamObserver>(Task task)
where TPipeStreamObserver : IPipeStreamObserver<T>, new()
{
NamedPipeServerStream pipe;
var observable = PipeStreamObservable.Create<T>(out pipe, _name, (sender, args) => _auto.Set());
var observer = new TPipeStreamObserver() {PipeStream = pipe};
observable.Subscribe(
observer.OnNext,
ex =>
{
observer.OnError(ex);
pipe.Close();
lock (_pipes)
{
_pipes.Remove(pipe);
}
},
() =>
{
observer.OnCompleted();
pipe.Close();
lock (_pipes)
{
_pipes.Remove(pipe);
}
});
_auto.WaitOne();
lock (_pipes)
{
if (_running)
_pipes.Add(pipe);
}
if (_running)
{
task.ContinueWith(IpcServerPipeCreate<TPipeStreamObserver>);
observer.OnConnected();
}
else
{
pipe.Close();
}
}
public Task Start<TPipeStreamObserver>() where TPipeStreamObserver : IPipeStreamObserver<T>, new()
{
_running = true;
_task = _task.ContinueWith(IpcServerPipeCreate<TPipeStreamObserver>);
return _task;
}
public void Stop()
{
_running = false;
_task.ContinueWith(Stop);
}
private void Stop(Task task)
{
lock (_pipes)
{
_running = false;
_pipes.ForEach(pipe => pipe.Close());
}
_auto.Set();
for (;;)
{
int count;
lock (_pipes)
{
count = _pipes.Count;
}
if (count == 0)
break;
}
}
}
}
@hanishi
Copy link
Author

hanishi commented Oct 24, 2013

Here is how it is used:

class ServerObserver : IPipeStreamObserver<string>
{

    public void OnNext(string value)
    {
        value += " World!";
        Console.WriteLine(value);

        PipeStream.Write(value);
    }

    public void OnError(Exception error)
    {

    }

    public void OnCompleted()
    {

    }

    public PipeStream PipeStream { get; set; }

    public void OnConnected()
    {

    }
}

class ClientObserver : IPipeStreamObserver<string>
{

    public void OnNext(string value)
    {
        Console.WriteLine(value);

        PipeStream.Write("Hello ");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error);
    }

    public void OnCompleted()
    {

    }

    public PipeStream PipeStream { get; set; }

    public void OnConnected()
    {
        PipeStream.Write("Hello ");
    }
}

class Program
{

    static void Main(string[] args)
    {

        var server = new IpcServer<string>("test");

        server.Start<ServerObserver>();

        var client1 = new IpcClient<string>(".", "test", new ClientObserver()).Create();

        Console.ReadLine();
        client1.Dispose();
        Console.WriteLine("Client1 disposed!");

        var client2 = new IpcClient<string>(".", "test", new ClientObserver()).Create();

        Console.ReadLine();
        client2.Dispose();
        Console.WriteLine("Client2 disposed!");

        var client3 = new IpcClient<string>(".", "test", new ClientObserver()).Create();

        Console.ReadLine();
        client3.Dispose();
        Console.WriteLine("Client3 disposed!");

        Console.ReadLine();
        server.Stop();
        Console.WriteLine("Server stopped!");
        Console.ReadLine();
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment