Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ahjohannessen/b8c7aff495a8207591fd to your computer and use it in GitHub Desktop.
Save ahjohannessen/b8c7aff495a8207591fd to your computer and use it in GitHub Desktop.
using System;
using System.CodeDom;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Embedded;
using EventStore.ClientAPI.SystemData;
using EventStore.Core;
using EventStore.Core.Bus;
using EventStore.Core.Messages;
namespace ReplicaSpike
{
class Program
{
static void Main(string[] args)
{
var port = int.Parse(args[0]);
var otherport = int.Parse(args[1]);
var node = StartEmbeddedEventStore(port);
Console.WriteLine("Node up. waiting");
Thread.Sleep(10000);
Console.WriteLine("Setting up thread to write later.");
Task.Factory.StartNew(() => WriteEvents(node, port));
Console.WriteLine("Setting up replication");
ReplicateFrom(node, otherport);
Console.ReadLine();
}
private static void WriteEvents(ClusterVNode node, int myPort)
{
using (var conn = EmbeddedEventStoreConnection.Create(node, ConnectionSettings.Create().LimitRetriesForOperationTo(3)))
{
for(int i=0;i<500;i++)
{
Thread.Sleep(1000);
Console.WriteLine("writing event to me");
conn.AppendToStreamAsync("foo-" + myPort, ExpectedVersion.Any, BuildEvent(myPort)).Wait();
}
}
}
private static EventData BuildEvent(int port)
{
return new EventData(Guid.NewGuid(), "foo", true, new byte[500], Encoding.UTF8.GetBytes(port.ToString()));
}
private static void ReplicateFrom(ClusterVNode node, int otherport)
{
Console.WriteLine("Attempting to connect");
//load up initial checksum from stream or even local checkpoint (local checkpoint may be better)
using (var localConnection = EmbeddedEventStoreConnection.Create(node, "local connection"))
{
using (var connection = EventStoreConnection.Create(ConnectionSettings.Create()
.KeepReconnecting()
.KeepRetrying()
.Build(), new IPEndPoint(IPAddress.Loopback, otherport)))
{
connection.ConnectAsync().Wait();
Console.WriteLine("Successfully conencted to localhost:{0}", otherport);
Console.WriteLine("Setting up subscription");
var sub = connection.SubscribeToAllFrom(null, false,
(s, ev) =>
{
var port = TryReadPortFromMetadata(ev.OriginalEvent.Metadata);
if (port == otherport)
{
Console.WriteLine("writing event {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber);
localConnection.AppendToStreamAsync(ev.OriginalStreamId, ev.OriginalEventNumber -1,
new UserCredentials("admin", "changeit"),
new EventData(ev.OriginalEvent.EventId,
ev.OriginalEvent.EventType,
ev.OriginalEvent.IsJson,
ev.OriginalEvent.Data,
ev.OriginalEvent.Metadata)).Wait();
}
else
{
Console.WriteLine("Not writing {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber);
}
},
(s) => Console.WriteLine("live started`"),
(s, e, ex) =>
{
Console.WriteLine("sub dropped {0} {1}", e, ex);
},
new UserCredentials("admin", "changeit"));
Console.WriteLine("waiting.");
Console.ReadLine();
}
}
}
private static int TryReadPortFromMetadata(byte[] metadata)
{
if (metadata == null) return 0;
try
{
var str = Encoding.UTF8.GetString(metadata);
return int.Parse(str);
}
catch (Exception ex)
{
return 0;
}
}
static ClusterVNode StartEmbeddedEventStore(int port)
{
var embeddedEventStore = EmbeddedVNodeBuilder.AsSingleNode()
.RunInMemory()
.RunProjections(ProjectionsMode.None)
.WithExternalTcpOn(new IPEndPoint(IPAddress.Loopback, port))
.WithInternalTcpOn(new IPEndPoint(IPAddress.None, 1234))
.WithInternalHttpOn(new IPEndPoint(IPAddress.None, 1234))
.WithExternalHttpOn(new IPEndPoint(IPAddress.None, 1234))
.Build();
var startedEvent = new ManualResetEventSlim(false);
embeddedEventStore.MainBus.Subscribe(
new AdHocHandler<UserManagementMessage.UserManagementServiceInitialized>(m => startedEvent.Set()));
embeddedEventStore.Start();
Console.WriteLine("Waiting on node.");
if (!startedEvent.Wait(60000))
throw new TimeoutException("Embedded Event Store has not started in 60 seconds.");
return embeddedEventStore;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment