Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jen20/5395928 to your computer and use it in GitHub Desktop.
Save jen20/5395928 to your computer and use it in GitHub Desktop.
public class GetEventStoreEventDispatcher
{
private readonly IEventBus _eventBus;
private readonly EventStoreConnection _connection;
private bool _stopRequested;
private EventStoreAllCatchUpSubscription _subscription;
private readonly IPersistGetEventStorePosition _positionRepository;
public GetEventStoreEventDispatcher(EventStoreConnection connection, IEventBus eventBus, IPersistGetEventStorePosition positionRepository)
{
if (connection == null) throw new ArgumentNullException("connection");
if (eventBus == null) throw new ArgumentNullException("eventBus");
if (positionRepository == null) throw new ArgumentNullException("positionRepository");
_connection = connection;
_eventBus = eventBus;
_positionRepository = positionRepository;
}
public void StartDispatching()
{
_stopRequested = false;
RecoverSubscription();
}
public void StopDispatching()
{
_stopRequested = true;
if (_subscription != null)
_subscription.Stop(TimeSpan.FromSeconds(2));
}
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, string reason, Exception error)
{
if (_stopRequested)
return;
RecoverSubscription();
}
private void RecoverSubscription()
{
_subscription = _connection.SubscribeToAllFrom(_positionRepository.GetLastProcessedPosition(), false, HandleNewEvent, HandleSubscriptionDropped);
}
private void HandleNewEvent(EventStoreCatchUpSubscription subscription, ResolvedEvent @event)
{
_eventBus.Publish(ProcessRawEvent(@event));
if (!@event.OriginalPosition.HasValue)
throw new ArgumentException("ResolvedEvent didn't come off a subscription to all (has no position).");
_positionRepository.PersistLastPositionProcessed(@event.OriginalPosition.Value);
}
private static object ProcessRawEvent(ResolvedEvent rawEvent)
{
//NOTE: Normally you'd deserialize here... however, in the interests of not wanting to write
// a load of events etc, we're just going to return the ResolvedEvent. Original is below.
return rawEvent;
//if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0)
// return DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data);
//return null;
}
/// <summary>
/// Deserializes the event from the raw GetEventStore event to my event.
/// Took this from a gist that James Nugent posted on the GetEventStore forumns.
/// </summary>
/// <param name="metadata"></param>
/// <param name="data"></param>
/// <returns></returns>
private static object DeserializeEvent(byte[] metadata, byte[] data)
{
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property("EventClrTypeName").Value;
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName));
}
}
public class Program
{
public static void Main(string[] args)
{
var bus = new FakeEventBus();
var positionStore = new InMemoryPositionStore();
var connectionSettings = ConnectionSettings.Create()
.UseConsoleLogger()
.KeepReconnecting()
.KeepRetrying()
.OnErrorOccurred(ErrorOccurred);
var connection = EventStoreConnection.Create(connectionSettings);
var dispatcher = new GetEventStoreEventDispatcher(connection, bus, positionStore);
connection.Connect(new IPEndPoint(IPAddress.Loopback, 1113));
dispatcher.StartDispatching();
Console.ReadLine();
}
}
@jen20
Copy link
Author

jen20 commented Apr 16, 2013

Edited to remove internal queueing and to take EventStoreConnection as a dependency.

@jen20
Copy link
Author

jen20 commented Apr 16, 2013

Also note that this doesn't guarantee exactly once messaging or anything like that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment