Skip to content

Instantly share code, notes, and snippets.

@kris-zhang
Last active May 14, 2018 10:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save kris-zhang/f2d647f0051cdc007fbc195e07ee0c44 to your computer and use it in GitHub Desktop.
Save kris-zhang/f2d647f0051cdc007fbc195e07ee0c44 to your computer and use it in GitHub Desktop.
atomix api学习
/*
* Alipay.com Inc.
* Copyright (c) 2004-2017 All Rights Reserved.
*/
package com.alipay.kris.other.atomix;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import com.alipay.kris.other.atomix.Election.Client.MyResource;
import com.google.common.collect.Lists;
import io.atomix.AtomixClient;
import io.atomix.AtomixReplica;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.netty.NettyTransport;
import io.atomix.collections.DistributedMap;
import io.atomix.collections.DistributedMultiMap;
import io.atomix.collections.DistributedQueue;
import io.atomix.collections.DistributedSet;
import io.atomix.concurrent.DistributedLock;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.group.DistributedGroup;
import io.atomix.group.GroupMember;
import io.atomix.group.LocalMember;
import io.atomix.group.messaging.MessageConsumer;
import io.atomix.group.messaging.MessageProducer;
import io.atomix.group.messaging.MessageProducer.Delivery;
import io.atomix.group.messaging.MessageProducer.Execution;
import io.atomix.group.messaging.MessageProducer.Options;
import io.atomix.resource.ResourceFactory;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceTypeInfo;
import io.atomix.variables.AbstractDistributedValue;
import io.atomix.variables.DistributedLong;
import io.atomix.variables.DistributedValue;
import io.atomix.variables.internal.LongCommands;
import io.atomix.variables.internal.LongCommands.DecrementAndGet;
import io.atomix.variables.internal.LongCommands.IncrementAndGet;
import io.atomix.variables.internal.LongCommands.LongCommand;
/**
* 集群选举
*
* @author gongzuo.zy
* @version $Id: Study.java, v0.1 2017-04-21 16:42 gongzuo.zy Exp $
*/
public class Election {
// 定义集群所有地址
static List<Address> clusters = Lists.newArrayList(Server1.address, Server2.address,Server3.address, Server4.address, Server5.address);
private static class Server1 {
static Address address = new Address("localhost", 5001);
static AtomixReplica atomix = replica(address, "/Users/gongzuo.zy/Desktop/t/d1");
public static void main(String[] args) throws ExecutionException, InterruptedException {
atomix.bootstrap(clusters).join();
joinGroup(atomix);
}
}
private static class Server2 {
static Address address = new Address("localhost", 5002);
static AtomixReplica atomix = replica(address, "/Users/gongzuo.zy/Desktop/t/d2");
public static void main(String[] args) throws ExecutionException, InterruptedException {
atomix.bootstrap(clusters).join();
joinGroup(atomix);
}
}
private static class Server3 {
static Address address = new Address("localhost", 5003);
static AtomixReplica atomix = replica(address, "/Users/gongzuo.zy/Desktop/t/d3");
public static void main(String[] args) throws ExecutionException, InterruptedException {
atomix.bootstrap(clusters).join();
joinGroup(atomix);
}
}
private static class Server4 {
static Address address = new Address("localhost", 5004);
static AtomixReplica atomix = replica(address, "/Users/gongzuo.zy/Desktop/t/d4");
public static void main(String[] args) throws ExecutionException, InterruptedException {
atomix.bootstrap(clusters).join();
joinGroup(atomix);
}
}
private static class Server5 {
static Address address = new Address("localhost", 5005);
static AtomixReplica atomix = replica(address, "/Users/gongzuo.zy/Desktop/t/d5");
public static void main(String[] args) throws ExecutionException, InterruptedException {
atomix.bootstrap(clusters).join();
joinGroup(atomix);
}
}
private static void joinGroup(AtomixReplica atomix) throws ExecutionException, InterruptedException {
DistributedGroup group = atomix.getGroup("group").get();
LocalMember localMember = group.join().get();
/*
* 可以监听整个集群离开和加入的group信息
*/
group.onJoin(m->System.out.println("加入成功id:" + m.id()));
group.onLeave(m->System.out.println("离群成功id:" + m.id()));
/*
* group可以设定序列化机制,当收到消息可以直接进行序列化使用,需要实现TypeSerializer接口
*
*/
group.serializer().register(Election.class, aClass -> null);
/*
* 进行组播
*/
MessageProducer.Options options = new MessageProducer.Options()
.withDelivery(MessageProducer.Delivery.RANDOM)
.withExecution(Execution.SYNC);
MessageProducer<String> producer = group.messaging().producer("producerName", options);
producer.send("broadcast message");
/*
* 通过message机制进行监听,客户端可以直接发消息给
* 任何一个consumer,亦可以广播消息
* group.messaging可以采用广播,
* 针对一个member则使用单播
*/
MessageConsumer<String> consumer = localMember.messaging().consumer("topic");
consumer.onMessage(m-> {
System.out.println("收到消息:" + m);
m.ack();
});
/*
* 遍历所有group内节点
*/
for (GroupMember groupMember : group.members()) {
System.out.println(groupMember.id());
}
// 选举回调
group.election().onElection(term -> {
// term为当前选举任期信息
if (term.leader().equals(localMember)) {
System.out.println("选上领导了");
} else {
System.out.println("没有选上,Leader:" + term.leader());
}
});
}
private static AtomixReplica replica(Address address, String storage) {
return AtomixReplica.builder(address)
.withTransport(new NettyTransport())
.withStorage(new Storage(storage))
.addResourceType(MyResource.class)
.build();
}
private static class Client {
// 发送广播消息
static void messaging() throws ExecutionException, InterruptedException {
AtomixClient client = connect();
DistributedGroup group = client.getGroup("group").get();
group.messaging().producer("topic")
.send("this is my message").thenAccept(System.out::println);
}
// 创建我们自己的资源
@ResourceTypeInfo(id=12, factory = MyResourceFactory.class)
static class MyResource extends AbstractDistributedValue<DistributedLong, Long> {
protected MyResource(CopycatClient client, Properties options) {
super(client, options);
}
// 对外提供的方法
public CompletableFuture<Long> incrementAndGet() {
return client.submit(new MyResourceResolver.IncrementAndGet());
}
}
// 定义状态机
static class MyStateMachine extends ResourceStateMachine {
long value;
protected MyStateMachine(Properties config) {
super(config);
}
public long incrementAndGet(Commit<IncrementAndGet> commit) {
try {
Long oldValue = value;
value = oldValue + 1;
return value;
} finally {
commit.close();
}
}
}
// 注册操作状态
static class MyResourceResolver implements SerializableTypeResolver {
public static class IncrementAndGet extends LongCommand<Long> {
}
@Override
public void resolve(SerializerRegistry registry) {
registry.register(LongCommands.IncrementAndGet.class, -115);
}
}
static class MyResourceFactory implements ResourceFactory<MyResource> {
@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new MyResourceResolver();
}
@Override
public ResourceStateMachine createStateMachine(Properties properties) {
return new MyStateMachine(properties);
}
@Override
public MyResource createInstance(CopycatClient copycatClient, Properties properties) {
return new MyResource(copycatClient, properties);
}
}
static void multimap() throws ExecutionException, InterruptedException {
AtomixClient client = connect();
DistributedMultiMap<String, String> map = client.<String, String>getMultiMap("my_multimap").get();
map.put("k", "v1").join();
map.put("k", "v2").join();
}
static void queue() throws ExecutionException, InterruptedException {
AtomixClient client = connect();
DistributedQueue<String> queue = client.<String>getQueue("my_queue").get();
queue.offer("a");
queue.peek().join();
queue.remove().join();
queue.poll().join();
}
static void set() throws ExecutionException, InterruptedException {
AtomixClient client = connect();
CompletableFuture<DistributedSet<String>> completableFuture = client.getSet("my_set");
DistributedSet<String> set = completableFuture.get();
// 同步调用
if (!set.contains("key").join()) {
set.add("key").join();
} else {
System.out.println("含有key");
}
set.contains("key").thenAccept(contains->{
if (contains) {
System.out.println("含有key");
} else {
set.add("key").thenRun(()-> System.out.println("添加成功"));
}
});
}
static void map() throws ExecutionException, InterruptedException {
AtomixClient client = connect();
CompletableFuture<DistributedMap<String, String>> completableFuture = client.getMap("my_map");
DistributedMap<String, String> map = completableFuture.join();
// 同步调用
if (map.containsKey("key").join()) {
String value = map.get("key").join();
System.out.println(value);
} else {
//do others
}
// 同步调用
map.putIfAbsent("key", "value").join();
//异步调用
map.containsKey("key").thenAccept(containsKey -> {
if (containsKey) {
map.get("key").thenAccept(System.out::println);
} else {
//do others
}
});
map.putIfAbsent("key", "value").thenRun(()->{
System.out.println("success");
});
}
static void lock() throws ExecutionException, InterruptedException {
AtomixClient client = connect();
CompletableFuture<DistributedLock> completableFuture = client.getLock("my_lock");
// 异步api
completableFuture.thenAccept(lock -> {
lock.lock().thenRun(()->System.out.println("得到了锁"));
});
// 同步api
DistributedLock lock = completableFuture.get();
lock.lock().join();
}
static void variable() throws ExecutionException, InterruptedException {
AtomixClient client = connect();
DistributedValue<String> value = client.<String>getValue("test-value").get();
// 同步获得数据
System.out.println(value.get().join());
// 不带有过期时间的设置
value.set("lala");
// 带有时间的设置
value.set("lala", Duration.ofDays(10));
// 异步获得数据
value.get().thenAccept(System.out::println);
// 获得long变量
DistributedLong distributedLong = client.getLong("test-long").get();
// long的基本操作
System.out.println(distributedLong.getAndDecrement().join());
System.out.println(distributedLong.getAndAdd(10).join());
System.out.println(distributedLong.getAndDecrement().join());
}
static AtomixClient connect() throws ExecutionException, InterruptedException {
// 创建客户端
AtomixClient client = AtomixClient.builder()
.withTransport(new NettyTransport())
.build();
// 异步链接clusters
client.connect(clusters).thenRun(() -> {
System.out.println("Client connected!");
}).get();
// 同步链接
//client.connect(clusters).join();
return client;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment