Skip to content

Instantly share code, notes, and snippets.

@alexeyzimarev
Last active August 17, 2016 07:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexeyzimarev/2c763671aa392afa510522b77ef5d7df to your computer and use it in GitHub Desktop.
Save alexeyzimarev/2c763671aa392afa510522b77ef5d7df to your computer and use it in GitHub Desktop.
internal class SubscribeObserver : IRunOnStartAndStop
{
private readonly IBus _bus;
public SubscribeObserver(IBus bus)
{
_bus = bus;
}
public void OnBeforeStart()
{
var positions1 = Observable.Create<GPSPositionWithTrackedObjectId>(observer =>
_bus.ConnectConsumeObserver(new MessageObserver<GPSPositionWithTrackedObjectId>(observer)));
var positions2 = _bus.AsObservable<GPSPositionWithTrackedObjectId>();
positions1.Subscribe(x => Console.WriteLine("1: OnNext: {0}", x));
positions2.Subscribe(x => Console.WriteLine("2: OnNext: {0}", x));
}
public void OnAfterStop()
{
}
}
public class MessageObserver<TMessage> : IConsumeObserver where TMessage: class
{
private readonly IObserver<TMessage> _observer;
public MessageObserver(IObserver<TMessage> observer)
{
_observer = observer;
}
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
=> Task.FromResult(1);
public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
var message = context.Message as TMessage;
if (message != null) _observer.OnNext(message);
return Task.FromResult(1);
}
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
var message = context.Message as TMessage;
if (message != null) _observer.OnError(exception);
return Task.FromResult(1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment