Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexeyzimarev/42fc0e9cf723cb47517b to your computer and use it in GitHub Desktop.
Save alexeyzimarev/42fc0e9cf723cb47517b to your computer and use it in GitHub Desktop.
public class GetEventStoreEventDispatcher
{
private readonly IEventBus _eventBus;
private readonly EventStoreConnection _connection;
private bool _stopRequested;
private EventStoreAllCatchUpSubscription _subscription;
private readonly IPersistGetEventStorePosition _positionRepository;
public GetEventStoreEventDispatcher(EventStoreConnection connection, IEventBus eventBus, IPersistGetEventStorePosition positionRepository)
{
if (connection == null) throw new ArgumentNullException("connection");
if (eventBus == null) throw new ArgumentNullException("eventBus");
if (positionRepository == null) throw new ArgumentNullException("positionRepository");
_connection = connection;
_eventBus = eventBus;
_positionRepository = positionRepository;
}
public void StartDispatching()
{
_stopRequested = false;
RecoverSubscription();
}
public void StopDispatching()
{
_stopRequested = true;
if (_subscription != null)
_subscription.Stop(TimeSpan.FromSeconds(2));
}
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, string reason, Exception error)
{
if (_stopRequested)
return;
RecoverSubscription();
}
private void RecoverSubscription()
{
_subscription = _connection.SubscribeToAllFrom(_positionRepository.GetLastProcessedPosition(), false, HandleNewEvent, HandleSubscriptionDropped);
}
private void HandleNewEvent(EventStoreCatchUpSubscription subscription, ResolvedEvent @event)
{
_eventBus.Publish(ProcessRawEvent(@event));
if (!@event.OriginalPosition.HasValue)
throw new ArgumentException("ResolvedEvent didn't come off a subscription to all (has no position).");
_positionRepository.PersistLastPositionProcessed(@event.OriginalPosition.Value);
}
private static object ProcessRawEvent(ResolvedEvent rawEvent)
{
//NOTE: Normally you'd deserialize here... however, in the interests of not wanting to write
// a load of events etc, we're just going to return the ResolvedEvent. Original is below.
return rawEvent;
//if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0)
// return DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data);
//return null;
}
/// <summary>
/// Deserializes the event from the raw GetEventStore event to my event.
/// Took this from a gist that James Nugent posted on the GetEventStore forumns.
/// </summary>
/// <param name="metadata"></param>
/// <param name="data"></param>
/// <returns></returns>
private static object DeserializeEvent(byte[] metadata, byte[] data)
{
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property("EventClrTypeName").Value;
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName));
}
}
public class Program
{
public static void Main(string[] args)
{
var bus = new FakeEventBus();
var positionStore = new InMemoryPositionStore();
var connectionSettings = ConnectionSettings.Create()
.UseConsoleLogger()
.KeepReconnecting()
.KeepRetrying()
.OnErrorOccurred(ErrorOccurred);
var connection = EventStoreConnection.Create(connectionSettings);
var dispatcher = new GetEventStoreEventDispatcher(connection, bus, positionStore);
connection.Connect(new IPEndPoint(IPAddress.Loopback, 1113));
dispatcher.StartDispatching();
Console.ReadLine();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment