Skip to content

Instantly share code, notes, and snippets.

@zabrowarnyrafal
Created March 12, 2018 07:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zabrowarnyrafal/aca820db14ac54b5f1ce54954e068a0f to your computer and use it in GitHub Desktop.
Save zabrowarnyrafal/aca820db14ac54b5f1ce54954e068a0f to your computer and use it in GitHub Desktop.
ES subscription ver1
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.SerilogAdapter;
using Serilog;
namespace TestESSubs
{
public class ConnectionFactory
{
public IEventStoreConnection Create()
{
return EventStoreConnection
.Create(
ConfigurationManager.ConnectionStrings["eventStore"].ConnectionString,
ConnectionSettings
.Create()
.EnableVerboseLogging()
.UseSerilog(Log.Logger)
);
}
}
}
using System;
using System.Threading.Tasks;
using Serilog;
using Serilog.Core;
using TestESSubs.ES;
namespace TestESSubs
{
class Program
{
static void Main(string[] args)
{
Logging.Configure();
Log.Logger.Information("Starting...\n");
Subscribe();
ConsoleKeyInfo consoleKey;
do
{
while (!Console.KeyAvailable)
Task.Delay(100).Wait();
consoleKey = Console.ReadKey();
} while (consoleKey.Key != ConsoleKey.Escape);
}
private static void Subscribe()
{
var s = new Subscription();
s.Connect().Wait();
s.Start().Wait();
}
}
}
using EventStore.ClientAPI;
using System;
using System.Threading.Tasks;
using Serilog.Context;
using Serilog;
namespace TestESSubs
{
public class Subscription
{
private static Serilog.ILogger _log = Log.ForContext<Subscription>();
private readonly IEventStoreConnection _eventStoreConnection;
public Subscription()
{
_eventStoreConnection = new ConnectionFactory().Create();
}
public async Task Connect()
{
await _eventStoreConnection.ConnectAsync();
}
public async Task Start()
{
_log.Debug("[START] Subscription to DummyStream-1 starting");
try
{
var settings = new CatchUpSubscriptionSettings(
maxLiveQueueSize: 1000,
readBatchSize: 100,
verboseLogging: true,
resolveLinkTos: true
);
_eventStoreConnection
.SubscribeToStreamFrom(
stream:"DummyStream-1",
lastCheckpoint:null,
settings:settings,
eventAppeared: OnEventAppeared,
liveProcessingStarted: OnLiveProcessingStarted,
subscriptionDropped: OnSubscriptionDrop
);
_log.Debug("[START] Subscription to DummyStream-1 subscribed");
}
catch (Exception e)
{
_log.Error("[START] Subscription to DummyStream-1 failed to start");
throw;
}
}
private void OnEventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent @event)
{
using (LogContext.PushProperty("eventType", @event.Event?.EventType))
{
_log.Debug("Subscription to DummyStream-1 received event");
}
}
private void OnLiveProcessingStarted(EventStoreCatchUpSubscription subscription)
{
_log.Debug("Subscription to DummyStream-1 started live processing events");
}
private void OnSubscriptionDrop(EventStoreCatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception)
{
using (LogContext.PushProperty("exception", exception))
{
_log.Warning("[STOP] Subscription DummyStream-1 dropped with reason {reason}", reason);
}
subscription.Stop();
switch (reason)
{
case SubscriptionDropReason.UserInitiated:
break;
default:
_log.Warning("[STOP] Subscription to DummyStream-1 resurrecting");
Start();
break;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment