Skip to content

Instantly share code, notes, and snippets.

@jen20
Created March 4, 2013 14:52
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jen20/5082745 to your computer and use it in GitHub Desktop.
Save jen20/5082745 to your computer and use it in GitHub Desktop.
Quick and dirty way to hook up RX to an Event Store subscription. Emphasis on "quick" and "dirty".
static class EventStoreConnectionExtensions
{
public static Task<EventStoreRxSubscription> SubscribeToAll(this EventStoreConnection connection, bool resolveLinkTos)
{
return Task<EventStoreRxSubscription>.Factory.StartNew(() => {
var subject = new Subject<ResolvedEvent>();
var subscriptionTask = connection.SubscribeToAll(resolveLinkTos, subject.OnNext, () => subject.OnError(new SubscriptionDroppedException()));
subscriptionTask.Wait();
return new EventStoreRxSubscription(subscriptionTask.Result, subject);
});
}
}
internal class EventStoreRxSubscription
{
public Subject<ResolvedEvent> Observable { get; private set; }
public EventStoreSubscription Subscription { get; private set; }
public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> observable)
{
Subscription = subscription;
Observable = observable;
}
}
class SubscriptionDroppedException : Exception {}
class Program
{
static void Main(string[] args)
{
var connection = EventStoreConnection.Create(ConnectionSettings.Default);
connection.Connect(new IPEndPoint(IPAddress.Loopback, 1113));
var subscriptionTask = connection.SubscribeToAll(true);
subscriptionTask.Wait();
var subscription = subscriptionTask.Result;
var observable =
subscription.Observable.Where(x => x.OriginalEvent.EventType == "WoftamEvent").Delay(new TimeSpan(0, 0, 0, 5));
observable.Subscribe(t => Console.WriteLine(t.OriginalStreamId));
Console.ReadLine();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment