Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DavidBrower/05e0a92faeeb8a0a3718 to your computer and use it in GitHub Desktop.
Save DavidBrower/05e0a92faeeb8a0a3718 to your computer and use it in GitHub Desktop.
Quick and dirty way to hook up RX to an Event Store subscription. Emphasis on "quick" and "dirty".
static class EventStoreConnectionExtensions
{
public static Task<EventStoreRxSubscription> SubscribeToAll(this EventStoreConnection connection, bool resolveLinkTos)
{
return Task<EventStoreRxSubscription>.Factory.StartNew(() => {
var subject = new Subject<ResolvedEvent>();
var subscriptionTask = connection.SubscribeToAll(resolveLinkTos, subject.OnNext, () => subject.OnError(new SubscriptionDroppedException()));
subscriptionTask.Wait();
return new EventStoreRxSubscription(subscriptionTask.Result, subject);
});
}
}
internal class EventStoreRxSubscription
{
public Subject<ResolvedEvent> Observable { get; private set; }
public EventStoreSubscription Subscription { get; private set; }
public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> observable)
{
Subscription = subscription;
Observable = observable;
}
}
class SubscriptionDroppedException : Exception {}
class Program
{
static void Main(string[] args)
{
var connection = EventStoreConnection.Create(ConnectionSettings.Default);
connection.Connect(new IPEndPoint(IPAddress.Loopback, 1113));
var subscriptionTask = connection.SubscribeToAll(true);
subscriptionTask.Wait();
var subscription = subscriptionTask.Result;
var observable =
subscription.Observable.Where(x => x.OriginalEvent.EventType == "WoftamEvent").Delay(new TimeSpan(0, 0, 0, 5));
observable.Subscribe(t => Console.WriteLine(t.OriginalStreamId));
Console.ReadLine();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment