Created
September 2, 2016 14:44
-
-
Save vvreutskiy/2d91e4003d256e6f1708dd7065b33b12 to your computer and use it in GitHub Desktop.
MNTK Cluster Client/Shard Experiment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Immutable; | |
using System.Diagnostics; | |
using Akka.Actor; | |
using Akka.Cluster.Sharding; | |
using Akka.Cluster.TestKit; | |
using Akka.Cluster.Tools.Client; | |
using Akka.Cluster.Tools.PublishSubscribe; | |
using Akka.Configuration; | |
using Akka.Event; | |
using Akka.Remote.TestKit; | |
using FluentAssertions; | |
namespace Akka.Experiment | |
{ | |
public class ExperimentClusterSpecConfig : MultiNodeConfig | |
{ | |
public RoleName Client { get; } | |
public RoleName Seed { get; } | |
public RoleName Worker { get; } | |
public ExperimentClusterSpecConfig() | |
{ | |
Client = Role("client"); | |
Seed = Role("seed"); | |
Worker = Role("worker"); | |
CommonConfig = ConfigurationFactory.ParseString(@" | |
akka.loglevel = DEBUG | |
akka.stdout-loglevel = DEBUG | |
akka.suppress-json-serializer-warning = on | |
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" | |
akka.remote.log-remote-lifecycle-events = off | |
akka.cluster.auto-down-unreachable-after = 0s | |
akka.cluster.client.heartbeat-interval = 1s | |
akka.cluster.client.acceptable-heartbeat-pause = 3s | |
akka.cluster.client.refresh-contacts-interval = 1s | |
# number-of-contacts must be >= 4 because we shutdown all but one in the end | |
akka.cluster.client.receptionist.number-of-contacts = 4 | |
akka.cluster.client.receptionist.heartbeat-interval = 10s | |
akka.cluster.client.receptionist.acceptable-heartbeat-pause = 10s | |
akka.cluster.client.receptionist.failure-detection-interval = 1s | |
akka.test.filter-leeway = 10s | |
akka.actor.debug.receive = on | |
akka.actor.debug.autoreceive = on | |
akka.actor.debug.lifecycle = on | |
akka.actor.debug.event-stream = off | |
akka.actor.debug.unhandled = on | |
") | |
.WithFallback(ClusterClientReceptionist.DefaultConfig()) | |
.WithFallback(DistributedPubSub.DefaultConfig()); | |
TestTransport = true; | |
} | |
} | |
public abstract class ExperimentClusterSpec : MultiNodeClusterSpec | |
{ | |
private readonly ExperimentClusterSpecConfig _config; | |
public ExperimentClusterSpec() : this(new ExperimentClusterSpecConfig()) | |
{ | |
} | |
public ExperimentClusterSpec(ExperimentClusterSpecConfig config) : base(config) | |
{ | |
_config = config; | |
} | |
[MultiNodeFact] | |
public void ExperimentClusterSpecs() | |
{ | |
Log.Info("Test started"); | |
Debugger.Launch(); | |
RunOn(() => | |
{ | |
Console.WriteLine("seed starting"); | |
ClusterClientReceptionist.Get(Sys); | |
Console.WriteLine("seed started"); | |
}, _config.Seed); | |
EnterBarrier("seed_started"); | |
RunOn(() => | |
{ | |
Console.WriteLine("worker starting"); | |
Cluster.JoinSeedNodes(new[] {Node(_config.Seed).Address}); | |
var receptionist = ClusterClientReceptionist.Get(Sys); | |
var service = Sys.ActorOf<EchoService>("echoService"); | |
receptionist.RegisterService(service); | |
Console.WriteLine("worker started"); | |
}, _config.Worker); | |
EnterBarrier("worker_connected_to_seed"); | |
RunOn(() => | |
{ | |
Console.WriteLine("client starting"); | |
Cluster.JoinSeedNodes(new[] { Node(_config.Seed).Address }); | |
var actorPath = Node(_config.Seed)/"system"/"receptionist"; | |
Console.WriteLine("ClusterClient Initial contact: {0}", actorPath); | |
var initialContacts = new[] {actorPath}.ToImmutableHashSet(); | |
var props = ClusterClient.Props(ClusterClientSettings.Create(Sys).WithInitialContacts(initialContacts)); | |
var clusterClient = Sys.ActorOf(props, "client1"); | |
clusterClient.Tell(new ClusterClient.Send("/user/echoService", "hello from client")); | |
ExpectMsg<string>(s => { Console.WriteLine("Received message: {0}", s); }); | |
Console.WriteLine("client started"); | |
}, _config.Client); | |
EnterBarrier("client_connected"); | |
Log.Info("Test finished"); | |
} | |
public class EchoService : ReceiveActor | |
{ | |
public EchoService() | |
{ | |
ReceiveAny(m => | |
{ | |
Context.GetLogger().Info("{0} got message {1} from {2} ", Self.Path, m, Sender.Path); | |
Sender.Tell($"Reply {m} from {Self.Path}"); | |
}); | |
} | |
} | |
} | |
public class ExperimentClusterSpecNode1 : ExperimentClusterSpec | |
{ | |
} | |
public class ExperimentClusterSpecNode2 : ExperimentClusterSpec | |
{ | |
} | |
public class ExperimentClusterSpecNode3 : ExperimentClusterSpec | |
{ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment