Skip to content

Instantly share code, notes, and snippets.

@youthlin
Created May 17, 2020 09:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save youthlin/d7bff98960ab40bb74a63f0eafcd099c to your computer and use it in GitHub Desktop.
Save youthlin/d7bff98960ab40bb74a63f0eafcd099c to your computer and use it in GitHub Desktop.
package xxx;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.cluster.LoadBalance;
import com.alibaba.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance;
import com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance;
import com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A 调用 B
* 现象:当前使用一致性哈希方式,B 刚启动时,代码没有预热,所有流量打过去,造成启动期间 CPU 高
* 前提:使用随机方式,可以在服务治理配置预热,刚启动的机器权重会小一些,打过去的请求少一些,因此可以让 B 预热(即时编译代码之类的)
* 方案:一个 requestId 的第一次请求使用随机,后续同一个 requestId 使用相同的
* 限制:x 和 y 是不同的接口,但是 requestId 相同也要打到同一台机器
* <p>
* 后续:当 B 两个接口合并为一个接口的不同方法后,可以缓存 requestId-invoker
*
* @author youthlin.chen
* @date 2019-10-24 15:32
*/
@Slf4j
@SuppressWarnings("UnstableApiUsage")
public class ReuseSessionLoadBalance extends AbstractLoadBalance {
/**
* 好像所有实现都是小写的名字呢
*/
@SuppressWarnings("unused")
public static final String NAME = "reusesession";
/**
* 一个 requestId 对应的提供者机器地址
* 不缓存 requestId-invoker 的原因:
* x 接口和 y 接口的 requestId 相同,但 invoker 不同
* 需要打到同一台机器(address 相同就是同一台机器)
*/
private static Cache<String, String> requestToAddress;
static {
resetCache(/*minutes 配置的值*/15L);//省略内部获取配置代码,配置变更实时 reset
}
/**
* 重设缓存时间
*/
private static void resetCache(long minutes) {
Cache<String, String> newCache = newCache(minutes);
if (requestToAddress != null) {
// 原有的不能丢了
newCache.putAll(requestToAddress.asMap());
}
requestToAddress = newCache;
log.info("reset_cache, minutes={} count={}", minutes, newCache.size());
}
private static <K, V> Cache<K, V> newCache(long minutes) {
return CacheBuilder.newBuilder()
.expireAfterAccess(minutes, TimeUnit.MINUTES)
.removalListener(n -> /*记个监控.recordOne("reuseSession_cache_remove_cause_" + n.getCause())*/;)
.build();
}
/**
* 第一次请求使用随机 刚启动预热中的机器权重更低
*/
private LoadBalance randomLoadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class)
.getExtension(RandomLoadBalance.NAME);
/**
* 开关关闭使用默认的一致性哈希
*/
private LoadBalance consistentHashLoadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class)
.getExtension(ConsistentHashLoadBalance.NAME);
/**
* 一个服务的方法对应的提供方列表: key 为 serviceKey.methodName value 为该方法对应的所有提供方
* 因为缓存的是机器 address(即 host:port) 而我们需要返回 invoker
* 为了不用每次遍历 invokers 找到相应 address 的 invoker
* 所以使用 Selector 包一下,提前做好 address - invoker 映射
* 在 invokers 变化时(identityHashCode 改变)重新缓存映射关系
*/
private ConcurrentMap<String, Selector<?>> selectors = Maps.newConcurrentMap();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
long start = System.currentTimeMillis();
try {
// 取第一个参数作为会话的标识, 即: requestId
final String requestId = invocation.getArguments()[0].toString();
// 开关关闭用原来的一致性哈希
if (!/*开关默认关闭*/false
&& requestToAddress.getIfPresent(requestId) == null) {
if (requestToAddress.size() > 0) {
// cleanUp 清除过期的 key, 全部清除是 invalidateAll
requestToAddress.cleanUp();
}
if (requestToAddress.size() == 0) {
selectors.clear();
}
//记个监控
return consistentHashLoadBalance.select(invokers, url, invocation);
}
// 不同接口不同 key: group/interface:version
String serviceKey = invokers.get(0).getUrl().getServiceKey();
@SuppressWarnings("unchecked")
Selector<T> selector = (Selector<T>) selectors.get(serviceKey);
int identityHashCode = System.identityHashCode(invokers);
// hash 值不一样说明提供者有变化 需要重新缓存提供者列表
if (selector == null || selector.identityHashCode != identityHashCode) {
//记个监控 recordOne("reuseSession_do_select_put_selector");
selector = new Selector<>(invokers, identityHashCode);
selectors.put(serviceKey, selector);
}
final String address = requestToAddress.get(requestId, () -> {
// 记个监控recordOne("reuseSession_do_select_first_requestId");
// 第一次随机选一台 (guava cache 机制 多个线程同时 load 只有一个线程会进来)
return randomLoadBalance.select(invokers, url, invocation).getUrl().getAddress();
});
//记个监控recordOne("reuseSession_new");
return selector.computeInvokerIfAbsent(address, (adr) -> {
// address 是缓存的,但 invoker 已下线 此时需要 reselect
// 这时 selector map 里 address 是下线的,但对应的 invoker 是在线的
// e.g.: 127.0.0.1 是缓存里的 address, 对应机器已下线;
// 然后这里重新选择使该 address 对应了正确的 invoker 192.168.0.1
Invoker<T> invoker = randomLoadBalance.select(invokers, url, invocation);
requestToAddress.put(requestId, invoker.getUrl().getAddress());
return invoker;
}
);
} catch (Exception e) {
//记个监控recordOne("reuseSession_do_select_error");
log.warn("reuseSession_do_select_error", e);
// 兜底
return consistentHashLoadBalance.select(invokers, url, invocation);
} finally {
//记个监控recordOne("reuseSession_do_select", System.currentTimeMillis() - start);
}
}
private static class Selector<T> {
private final ConcurrentHashMap<String, Invoker<T>> map;
private final long identityHashCode;
private Selector(List<Invoker<T>> invokers, long identityHashCode) {
this.map = invokers.stream()
.collect(Collectors.toMap(
invoker -> invoker.getUrl().getAddress(),
Function.identity(),
(older, newer) -> {
// 每个提供者的地址都是不同的,所以这里不应该发生
//记个监控.recordOne("reuseSession_selector_duplicated_address_invoker");
return newer;
},
ConcurrentHashMap::new
));
this.identityHashCode = identityHashCode;
}
private Invoker<T> computeInvokerIfAbsent(String address, Function<String, Invoker<T>> reselect) {
// ConcurrentHashMap 保证同一个 key 的 mappingFunction 只有一个会执行
// doc: the function is applied at most once per key
return map.computeIfAbsent(address, reselect);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment