Skip to content

Instantly share code, notes, and snippets.

@vvreutskiy
Created September 2, 2016 14:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vvreutskiy/2d91e4003d256e6f1708dd7065b33b12 to your computer and use it in GitHub Desktop.
Save vvreutskiy/2d91e4003d256e6f1708dd7065b33b12 to your computer and use it in GitHub Desktop.
MNTK Cluster Client/Shard Experiment
using System;
using System.Collections.Immutable;
using System.Diagnostics;
using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Cluster.TestKit;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Configuration;
using Akka.Event;
using Akka.Remote.TestKit;
using FluentAssertions;
namespace Akka.Experiment
{
public class ExperimentClusterSpecConfig : MultiNodeConfig
{
public RoleName Client { get; }
public RoleName Seed { get; }
public RoleName Worker { get; }
public ExperimentClusterSpecConfig()
{
Client = Role("client");
Seed = Role("seed");
Worker = Role("worker");
CommonConfig = ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.stdout-loglevel = DEBUG
akka.suppress-json-serializer-warning = on
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.client.heartbeat-interval = 1s
akka.cluster.client.acceptable-heartbeat-pause = 3s
akka.cluster.client.refresh-contacts-interval = 1s
# number-of-contacts must be >= 4 because we shutdown all but one in the end
akka.cluster.client.receptionist.number-of-contacts = 4
akka.cluster.client.receptionist.heartbeat-interval = 10s
akka.cluster.client.receptionist.acceptable-heartbeat-pause = 10s
akka.cluster.client.receptionist.failure-detection-interval = 1s
akka.test.filter-leeway = 10s
akka.actor.debug.receive = on
akka.actor.debug.autoreceive = on
akka.actor.debug.lifecycle = on
akka.actor.debug.event-stream = off
akka.actor.debug.unhandled = on
")
.WithFallback(ClusterClientReceptionist.DefaultConfig())
.WithFallback(DistributedPubSub.DefaultConfig());
TestTransport = true;
}
}
public abstract class ExperimentClusterSpec : MultiNodeClusterSpec
{
private readonly ExperimentClusterSpecConfig _config;
public ExperimentClusterSpec() : this(new ExperimentClusterSpecConfig())
{
}
public ExperimentClusterSpec(ExperimentClusterSpecConfig config) : base(config)
{
_config = config;
}
[MultiNodeFact]
public void ExperimentClusterSpecs()
{
Log.Info("Test started");
Debugger.Launch();
RunOn(() =>
{
Console.WriteLine("seed starting");
ClusterClientReceptionist.Get(Sys);
Console.WriteLine("seed started");
}, _config.Seed);
EnterBarrier("seed_started");
RunOn(() =>
{
Console.WriteLine("worker starting");
Cluster.JoinSeedNodes(new[] {Node(_config.Seed).Address});
var receptionist = ClusterClientReceptionist.Get(Sys);
var service = Sys.ActorOf<EchoService>("echoService");
receptionist.RegisterService(service);
Console.WriteLine("worker started");
}, _config.Worker);
EnterBarrier("worker_connected_to_seed");
RunOn(() =>
{
Console.WriteLine("client starting");
Cluster.JoinSeedNodes(new[] { Node(_config.Seed).Address });
var actorPath = Node(_config.Seed)/"system"/"receptionist";
Console.WriteLine("ClusterClient Initial contact: {0}", actorPath);
var initialContacts = new[] {actorPath}.ToImmutableHashSet();
var props = ClusterClient.Props(ClusterClientSettings.Create(Sys).WithInitialContacts(initialContacts));
var clusterClient = Sys.ActorOf(props, "client1");
clusterClient.Tell(new ClusterClient.Send("/user/echoService", "hello from client"));
ExpectMsg<string>(s => { Console.WriteLine("Received message: {0}", s); });
Console.WriteLine("client started");
}, _config.Client);
EnterBarrier("client_connected");
Log.Info("Test finished");
}
public class EchoService : ReceiveActor
{
public EchoService()
{
ReceiveAny(m =>
{
Context.GetLogger().Info("{0} got message {1} from {2} ", Self.Path, m, Sender.Path);
Sender.Tell($"Reply {m} from {Self.Path}");
});
}
}
}
public class ExperimentClusterSpecNode1 : ExperimentClusterSpec
{
}
public class ExperimentClusterSpecNode2 : ExperimentClusterSpec
{
}
public class ExperimentClusterSpecNode3 : ExperimentClusterSpec
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment