在现有的Share的订阅模式之上,添加一种新的订阅模式Key_Failover。这种订阅模式,可以保证同一个订阅中的多个consumer可以共享订阅同一个partiton;同时又可以保证partiton中的每一个Key对应的消息只被发送到一个consumer。
现有的订阅模式中,share模式可以对同一个partition中的消息进行共享的消费,可以高效的清除partition的backlog。 但是在很多场景中,顺序性也是很常见的需求。在share模式中,我们不能保证同一个Key对应的消息会被发送到固定的某一个consumer中。每个consumer对消息的顺序性得不到保证。
我们想要在新的订阅模式Key_Failover中,保证同一个订阅中的多个consumer可以共享订阅同一个partiton;同时又可以保证partiton中的每一个Key对应的消息只被发送到一个consumer。
在Broker端的同一个订阅的所有consumer中,每一个consumer服务于一个特定范围内的hash值。 在Broker端在dispatch同一个订阅中的消息的时候,首先解析出每个消息的Key;根据每个消息的Key的hash值,交给服务这个hash值的Consumer。
主要的工作集中在hash的设置,和broker端新的dispatcher的添加。
对每个消息具体被dispatch发送给哪一个Consumer,由消息的key计算出的hash值决定。要保证同一个订阅中的一个consumer,固定服务于一个key。需要在订阅中有一层hash层,保证:
- 每个consumer服务一个hash范围内的Key;
- 所有的consumer,共同服务所有的hash范围;
- 保证各个consumer服务的范围尽可能均衡;
- 当consumer被删除后,或者有新的consumer加入后,所有consumer,仍然可以服务所有的Key;
对于每个订阅对应的dispatcher来说,可以统计每个consumer最近一段时间的dispatch rate。 如果当有新的consumer被加入的时候,新consumer分担原来最繁忙的那个consumer的hash范围; 当有consumer被close的时候,它原来服务的hash范围,交给hash范围和它临近的最不忙的那个consumer。
下面用一个简单的示意图,来说明hash的工作原理:
这是一个订阅中的多个Consumer的简单示意图,三种颜色的色块代表三个consumer,纵轴是消息Key对应的hash routing值,横轴是时间。
- 在0时刻,这个订阅加入了第一个consumer — C1,所有的消息都被这个C1来服务;
- 在T1时刻,有一个新的consumer — C2加入,hash在(0—0.5)的消息继续被C1服务,hash在(0.5,1)的消息被C2服务;
- 在T2时刻,又有一个新的consumer--C3加入,检测到C2流量比C1大,将C2的hash范围拆分一半给C3。C2服务(0.5–0.75),C3服务(0.75-1)
- 在T3时刻,C1被关闭,C1的hash范围只和C2临近,将C1的服务范围交给C2。C2服务(0-0.75)。
- 在T4时刻,C2被关闭,C3的hash服务范围变成(0-1)。
message CommandSubscribe {
enum SubType {
Exclusive = 0;
Shared = 1;
Failover = 2;
Key_Failover = 3; < ===
}
主要实现的函数:
void addConsumer(Consumer consumer) throws BrokerServiceException {
// 添加consumer
// 并且更新相关consumer服务的hash值的范围值。
}
void removeConsumer(Consumer consumer) throws BrokerServiceException {
// 移除consumer
// 并且更新相关consumer服务的hash值的范围值。
}
Consumer getConsumer(String key) {
// 根据对应的key,返回服务的consumer
}
当从bk读取消息之后,根据每个entry包含的key不同,把每个entry dispatch到对应的consumer中
void readEntriesComplete(List<Entry> entries, Object ctx) {
// 根据每个entry包含的key不同,把每个entry dispatch到对应的consumer中。
}
在这个函数中要处理的特殊情形:如果消息来了,对应的某一个Key的consumer没有permit,需要加入到messagestoReplay中,等有permits再处理。
为了支持消息的batch,需要在producer端,添加batch by key的支持。在初始版本中可以先不考虑。
Consumer的dispatch优先级没有了,因为消息按照Key发送了。是Key决定了每个消息被发送给哪个consumer,而不是consumer的优先级决定的。这样流控会受到一定的影响。
"同时又可以保证partiton中的每一个Key对应的消息只被发送到一个consumer"-----应该指的是 同一时刻 每一个key对应的消息只被发送到同一个consumer吧?不同时间段consumer的数据可能会有扩缩容对应的hash段有变化,如上边的0-T1这段时间k1发到C1,然后T1-T2时间段内有可能之前的k1消息被发到了C2。