Created October 28, 2015 14:02
using System;
using System.CodeDom;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Embedded;
using EventStore.ClientAPI.SystemData;
using EventStore.Core;
using EventStore.Core.Bus;
using EventStore.Core.Messages;
namespace ReplicaSpike
class Program
static void Main(string[] args)
var port = int.Parse(args[0]);
var otherport = int.Parse(args[1]);
var node = StartEmbeddedEventStore(port);
Console.WriteLine("Node up. waiting");
Console.WriteLine("Setting up thread to write later.");
Task.Factory.StartNew(() => WriteEvents(node, port));
Console.WriteLine("Setting up replication");
ReplicateFrom(node, otherport);
private static void WriteEvents(ClusterVNode node, int myPort)
using (var conn = EmbeddedEventStoreConnection.Create(node, ConnectionSettings.Create().LimitRetriesForOperationTo(3)))
for(int i=0;i<500;i++)
Console.WriteLine("writing event to me");
conn.AppendToStreamAsync("foo-" + myPort, ExpectedVersion.Any, BuildEvent(myPort)).Wait();
private static EventData BuildEvent(int port)
return new EventData(Guid.NewGuid(), "foo", true, new byte[500], Encoding.UTF8.GetBytes(port.ToString()));
private static void ReplicateFrom(ClusterVNode node, int otherport)
Console.WriteLine("Attempting to connect");
//load up initial checksum from stream or even local checkpoint (local checkpoint may be better)
using (var localConnection = EmbeddedEventStoreConnection.Create(node, "local connection"))
using (var connection = EventStoreConnection.Create(ConnectionSettings.Create()
.Build(), new IPEndPoint(IPAddress.Loopback, otherport)))
Console.WriteLine("Successfully conencted to localhost:{0}", otherport);
Console.WriteLine("Setting up subscription");
var sub = connection.SubscribeToAllFrom(null, false,
(s, ev) =>
var port = TryReadPortFromMetadata(ev.OriginalEvent.Metadata);
if (port == otherport)
Console.WriteLine("writing event {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber);
localConnection.AppendToStreamAsync(ev.OriginalStreamId, ev.OriginalEventNumber -1,
new UserCredentials("admin", "changeit"),
new EventData(ev.OriginalEvent.EventId,
Console.WriteLine("Not writing {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber);
(s) => Console.WriteLine("live started`"),
(s, e, ex) =>
Console.WriteLine("sub dropped {0} {1}", e, ex);
new UserCredentials("admin", "changeit"));
private static int TryReadPortFromMetadata(byte[] metadata)
if (metadata == null) return 0;
var str = Encoding.UTF8.GetString(metadata);
return int.Parse(str);
catch (Exception ex)
return 0;
static ClusterVNode StartEmbeddedEventStore(int port)
var embeddedEventStore = EmbeddedVNodeBuilder.AsSingleNode()
.WithExternalTcpOn(new IPEndPoint(IPAddress.Loopback, port))
.WithInternalTcpOn(new IPEndPoint(IPAddress.None, 1234))
.WithInternalHttpOn(new IPEndPoint(IPAddress.None, 1234))
.WithExternalHttpOn(new IPEndPoint(IPAddress.None, 1234))
var startedEvent = new ManualResetEventSlim(false);
new AdHocHandler<UserManagementMessage.UserManagementServiceInitialized>(m => startedEvent.Set()));
Console.WriteLine("Waiting on node.");
if (!startedEvent.Wait(60000))
throw new TimeoutException("Embedded Event Store has not started in 60 seconds.");
return embeddedEventStore;
