Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
MNTR test for verifying cluster rejoin bug
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Remote.TestKit;
using Akka.Remote.Transport;
using Akka.TestKit;
namespace Akka.Cluster.Tests.MultiNode
{
public class Bugfix_1700_ClusterRejoinSpecConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }
public RoleName NewMember { get; }
public Bugfix_1700_ClusterRejoinSpecConfig()
{
First = Role("first");
Second = Role("second");
NewMember = Role("new");
// Need to use the FailureDetectorPuppet to simulate reachability events
CommonConfig = MultiNodeLoggingConfig.LoggingConfig
.WithFallback(DebugConfig(true))
.WithFallback(MultiNodeClusterSpec.ClusterConfigWithFailureDetectorPuppet());
}
}
public class Bugfix_1700_ClusterRejoinMultiNode1 : Bugfix_1700_ClusterRejoinSpec
{
}
public class Bugfix_1700_ClusterRejoinMultiNode2 : Bugfix_1700_ClusterRejoinSpec
{
}
public class Bugfix_1700_ClusterRejoinMultiNode3 : Bugfix_1700_ClusterRejoinSpec
{
}
public abstract class Bugfix_1700_ClusterRejoinSpec : MultiNodeClusterSpec
{
readonly Bugfix_1700_ClusterRejoinSpecConfig _config;
protected Bugfix_1700_ClusterRejoinSpec() : this(new Bugfix_1700_ClusterRejoinSpecConfig()) { }
protected Bugfix_1700_ClusterRejoinSpec(Bugfix_1700_ClusterRejoinSpecConfig config) : base(config)
{
_config = config;
}
[MultiNodeFact]
public void A_node_should_be_able_to_join_cluster_if_leader_becomes_unreachable_and_comes_back_again_during_join
()
{
RunOn(StartClusterNode, _config.First);
EnterBarrier("first-started");
RunOn(() =>
{
Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() => ClusterView.IsLeader.ShouldBeTrue());
});
}, _config.First);
RunOn(() => Cluster.Join(GetAddress(_config.First)), _config.Second);
EnterBarrier("initial-cluster");
var leaderAddress = ClusterView.Leader;
RunOn(() =>
{
var unreachableTestLatch = new TestLatch();
var reachableAgainTestLatch = new TestLatch();
var unreachableListener =
Sys.ActorOf(Props.Create(() => new UnreachableListener(leaderAddress, unreachableTestLatch)),
"unreachable");
var reachableListener =
Sys.ActorOf(
Props.Create(() => new ReachableAgainListener(leaderAddress, reachableAgainTestLatch)),
"reachable-again");
Cluster.Subscribe(unreachableListener, new[] { typeof(ClusterEvent.IReachabilityEvent) });
Cluster.Subscribe(reachableListener, new[] { typeof(ClusterEvent.IReachabilityEvent) });
EnterBarrier("listeners-created");
// make the leader become unreachable
unreachableTestLatch.Ready(TestKitSettings.DefaultTimeout);
EnterBarrier("leader-unavailable", "newmember-begin-join");
//verify that the leader is reachable again
reachableAgainTestLatch.Ready(TestKitSettings.DefaultTimeout);
EnterBarrier("leader-available-again", "new-member-joined");
}, _config.Second);
RunOn(() =>
{
EnterBarrier("listeners-created");
Task.WaitAll(new Task[] { TestConductor.Blackhole(_config.First, _config.Second, ThrottleTransportAdapter.Direction.Both),
TestConductor.Blackhole(_config.First, _config.NewMember, ThrottleTransportAdapter.Direction.Both) }, TestKitSettings.DefaultTimeout);
EnterBarrier("leader-unavailable", "newmember-begin-join");
Task.WaitAll(new Task[] { TestConductor.PassThrough(_config.First, _config.Second, ThrottleTransportAdapter.Direction.Both),
TestConductor.PassThrough(_config.First, _config.NewMember, ThrottleTransportAdapter.Direction.Both) }, TestKitSettings.DefaultTimeout);
EnterBarrier("leader-available-again", "new-member-joined");
}, _config.First);
RunOn(() =>
{
EnterBarrier("listeners-created", "leader-unavailable");
//attempt to join leader now that it's unavailable
Cluster.Join(leaderAddress);
EnterBarrier("newmember-begin-join", "leader-available-again");
var myAddress = GetAddress(Myself);
AwaitAssert(() =>
{
ClusterView.Members.Any(
member => member.Address.Equals(myAddress) && member.Status == MemberStatus.Up).ShouldBeTrue($"{myAddress} should be a member of the cluster");
}, GetTimeoutOrDefault(null), TimeSpan.FromMilliseconds(100));
EnterBarrier("new-member-joined");
}, _config.NewMember);
}
}
class UnreachableListener : UntypedActor
{
readonly Address _leaderAddress;
readonly TestLatch _latch;
public UnreachableListener(Address leaderAddress, TestLatch latch)
{
_leaderAddress = leaderAddress;
_latch = latch;
}
protected override void OnReceive(object message)
{
var state = message as ClusterEvent.CurrentClusterState;
if (state != null)
{
var upLeaderMember =
state.Members.SingleOrDefault((m => m.Address == _leaderAddress && m.Status == MemberStatus.Up));
if (upLeaderMember != null && state.Unreachable.Contains(upLeaderMember))
_latch.CountDown();
}
var unreachableMember = message as ClusterEvent.UnreachableMember;
if (unreachableMember != null && unreachableMember.Member.Address == _leaderAddress)
_latch.CountDown();
}
}
class ReachableAgainListener : UntypedActor
{
readonly Address _leaderAddress;
readonly TestLatch _latch;
public ReachableAgainListener(Address leaderAddress, TestLatch latch)
{
_leaderAddress = leaderAddress;
_latch = latch;
}
protected override void OnReceive(object message)
{
var reachableMember = message as ClusterEvent.ReachableMember;
if (reachableMember != null && reachableMember.Member.Address == _leaderAddress)
_latch.CountDown();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.