Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save voronoipotato/3e5f61146860a5c18dbad7fc4c8f4cc2 to your computer and use it in GitHub Desktop.
Save voronoipotato/3e5f61146860a5c18dbad7fc4c8f4cc2 to your computer and use it in GitHub Desktop.
Quick and dirty way to hook up RX to an Event Store subscription. Emphasis on "quick" and "dirty".
static class EventStoreConnectionExtensions
{
public static Task<EventStoreRxSubscription> SubscribeToAll(this EventStoreConnection connection, bool resolveLinkTos)
{
return Task<EventStoreRxSubscription>.Factory.StartNew(() => {
var subject = new Subject<ResolvedEvent>();
var subscriptionTask = connection.SubscribeToAll(resolveLinkTos, subject.OnNext, () => subject.OnError(new SubscriptionDroppedException()));
subscriptionTask.Wait();
return new EventStoreRxSubscription(subscriptionTask.Result, subject);
});
}
}
internal class EventStoreRxSubscription
{
public Subject<ResolvedEvent> Observable { get; private set; }
public EventStoreSubscription Subscription { get; private set; }
public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> observable)
{
Subscription = subscription;
Observable = observable;
}
}
class SubscriptionDroppedException : Exception {}
class Program
{
static void Main(string[] args)
{
var connection = EventStoreConnection.Create(ConnectionSettings.Default);
connection.Connect(new IPEndPoint(IPAddress.Loopback, 1113));
var subscriptionTask = connection.SubscribeToAll(true);
subscriptionTask.Wait();
var subscription = subscriptionTask.Result;
var observable =
subscription.Observable.Where(x => x.OriginalEvent.EventType == "WoftamEvent").Delay(new TimeSpan(0, 0, 0, 5));
observable.Subscribe(t => Console.WriteLine(t.OriginalStreamId));
Console.ReadLine();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment