Skip to content

Instantly share code, notes, and snippets.

@bkrahmer
Created November 14, 2017 08:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bkrahmer/765b0c33ecd64f5db1ac72bdeecf217a to your computer and use it in GitHub Desktop.
Save bkrahmer/765b0c33ecd64f5db1ac72bdeecf217a to your computer and use it in GitHub Desktop.
package com.sixt.service.framework.distributed;
import com.google.common.io.Files;
import com.sixt.service.framework.util.Sleeper;
import io.atomix.Atomix;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.messaging.Endpoint;
import io.atomix.primitives.lock.DistributedLock;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AtomixTest {
private static final Logger logger = LoggerFactory.getLogger(AtomixTest.class);
Node[] nodes = new Node[4];
List<AtomixService> instances = new ArrayList<>();
Sleeper sleeper = new Sleeper();
@Before
public void setup() {
nodes[0] = Node.builder().withId(NodeId.from("0")).withEndpoint(Endpoint.from("localhost", 7000)).build();
nodes[1] = Node.builder().withId(NodeId.from("1")).withEndpoint(Endpoint.from("localhost", 7001)).build();
nodes[2] = Node.builder().withId(NodeId.from("2")).withEndpoint(Endpoint.from("localhost", 7002)).build();
nodes[3] = Node.builder().withId(NodeId.from("3")).withEndpoint(Endpoint.from("localhost", 7003)).build();
}
@Test
public void atomixTest() throws IOException {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 3; i++) {
AtomixService service = new AtomixService(i);
instances.add(service);
executor.submit(service);
}
sleeper.sleepNoException(5000);
executor.submit(() -> instances.get(0).acquireLock());
executor.submit(() -> instances.get(1).acquireLock());
executor.submit(() -> instances.get(2).acquireLock());
sleeper.sleepNoException(8000);
//kill the first instance
instances.get(0).myThread.stop();
//start a 4th
AtomixService service = new AtomixService(3);
instances.add(service);
executor.submit(service);
sleeper.sleepNoException(1000);
service.acquireLock();
}
private class AtomixService implements Runnable {
private final int index;
private Atomix atomix;
private Thread myThread;
private AtomixService(int index) {
this.index = index;
}
@Override
public void run() {
myThread = Thread.currentThread();
atomix = Atomix.builder().withLocalNode(nodes[index])
.withBootstrapNodes(getNodesAsCollection())
.withDataDir(Files.createTempDir())
.build()
.open()
.join();
}
private Collection<Node> getNodesAsCollection() {
List<Node> retval = new ArrayList<>();
if (index < 3) {
//use nodes 0-2 for initial bootstrap
for (int i = 0; i < 3; i++) {
retval.add(nodes[i]);
}
} else {
//now use nodes 1-3
for (int i = 1; i < 4; i++) {
retval.add(nodes[i]);
}
}
return retval;
}
public void acquireLock() {
while (atomix == null) {
sleeper.sleepNoException(50);
}
logger.info("Building lock");
DistributedLock lock = atomix.lockBuilder().withName("lock").build();
try {
logger.info("Built lock, acquiring");
lock.lock();
logger.info("Acquired lock");
sleeper.sleepNoException(2000);
} finally {
lock.unlock();
logger.info("Released lock");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment