Skip to content

Instantly share code, notes, and snippets.

@trbngr
Created June 20, 2015 00:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trbngr/46dea593e2e180d3bf8e to your computer and use it in GitHub Desktop.
Save trbngr/46dea593e2e180d3bf8e to your computer and use it in GitHub Desktop.
public abstract class EventStoreSubscriptionView : PersistentView
{
private readonly IDeserializer _deserializer;
private readonly Serializer _serializer;
private readonly IPEndPoint _tcpEndPoint;
private readonly ConnectionSettings _connectionSettings;
private IActorRef _self;
protected EventStoreSubscriptionView()
{
var serialization = Context.System.Serialization;
_serializer = serialization.FindSerializerForType(typeof(IPersistentRepresentation));
var extension = EventStorePersistence.Instance.Apply(Context.System);
var settings = extension.JournalSettings;
var address = Dns.GetHostAddresses(settings.Host).First();
_deserializer = settings.Deserializer;
_tcpEndPoint = new IPEndPoint(address, settings.TcpPort);
_connectionSettings = settings.ConnectionSettingsFactory.Create();
}
private void EventAppeard(EventStoreSubscription subscription, ResolvedEvent @event)
{
var representation = _deserializer.GetRepresentation(_serializer, @event.OriginalEvent);
var payload = representation.Payload;
if (payload != null)
{
//ho lee fuk! sum ting wong!
//No ActorContext
_self.Tell(payload, Sender);
}
}
public override bool IsAutoUpdate { get { return false; } }
protected override void PreStart()
{
base.PreStart();
_self = Self;
AsyncContext.Run(async () =>
{
var connection = EventStoreConnection.Create(_connectionSettings, _tcpEndPoint);
await connection.ConnectAsync();
await connection.SubscribeToStreamAsync(PersistenceId, false, EventAppeard);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment