Skip to content

Instantly share code, notes, and snippets.

@jiazhai
Last active May 9, 2019 03:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jiazhai/372e0327234ace4d1611fcebeeadb964 to your computer and use it in GitHub Desktop.
Save jiazhai/372e0327234ace4d1611fcebeeadb964 to your computer and use it in GitHub Desktop.

目标:

在现有的Share的订阅模式之上,添加一种新的订阅模式Key_Failover。这种订阅模式,可以保证同一个订阅中的多个consumer可以共享订阅同一个partiton;同时又可以保证partiton中的每一个Key对应的消息只被发送到一个consumer。

现状和要解决的问题

现有的订阅模式中,share模式可以对同一个partition中的消息进行共享的消费,可以高效的清除partition的backlog。 但是在很多场景中,顺序性也是很常见的需求。在share模式中,我们不能保证同一个Key对应的消息会被发送到固定的某一个consumer中。每个consumer对消息的顺序性得不到保证。

我们想要在新的订阅模式Key_Failover中,保证同一个订阅中的多个consumer可以共享订阅同一个partiton;同时又可以保证partiton中的每一个Key对应的消息只被发送到一个consumer。

解决方法

在Broker端的同一个订阅的所有consumer中,每一个consumer服务于一个特定范围内的hash值。 在Broker端在dispatch同一个订阅中的消息的时候,首先解析出每个消息的Key;根据每个消息的Key的hash值,交给服务这个hash值的Consumer。

具体实现

主要的工作集中在hash的设置,和broker端新的dispatcher的添加。

hash的设置

对每个消息具体被dispatch发送给哪一个Consumer,由消息的key计算出的hash值决定。要保证同一个订阅中的一个consumer,固定服务于一个key。需要在订阅中有一层hash层,保证:

  1. 每个consumer服务一个hash范围内的Key;
  2. 所有的consumer,共同服务所有的hash范围;
  3. 保证各个consumer服务的范围尽可能均衡;
  4. 当consumer被删除后,或者有新的consumer加入后,所有consumer,仍然可以服务所有的Key;

对于每个订阅对应的dispatcher来说,可以统计每个consumer最近一段时间的dispatch rate。 如果当有新的consumer被加入的时候,新consumer分担原来最繁忙的那个consumer的hash范围; 当有consumer被close的时候,它原来服务的hash范围,交给hash范围和它临近的最不忙的那个consumer。

下面用一个简单的示意图,来说明hash的工作原理:

picture-1

这是一个订阅中的多个Consumer的简单示意图,三种颜色的色块代表三个consumer,纵轴是消息Key对应的hash routing值,横轴是时间。

  1. 在0时刻,这个订阅加入了第一个consumer — C1,所有的消息都被这个C1来服务;
  2. 在T1时刻,有一个新的consumer — C2加入,hash在(0—0.5)的消息继续被C1服务,hash在(0.5,1)的消息被C2服务;
  3. 在T2时刻,又有一个新的consumer--C3加入,检测到C2流量比C1大,将C2的hash范围拆分一半给C3。C2服务(0.5–0.75),C3服务(0.75-1)
  4. 在T3时刻,C1被关闭,C1的hash范围只和C2临近,将C1的服务范围交给C2。C2服务(0-0.75)。
  5. 在T4时刻,C2被关闭,C3的hash服务范围变成(0-1)。

在PulsarApi.proto中添加新的订阅模式

message CommandSubscribe {
	enum SubType {
		Exclusive = 0;
		Shared    = 1;
		Failover  = 2;
		Key_Failover = 3; < ===
	}

在Broker端实现一个新的KeyStickyDispatcherMultipleConsumers,

主要实现的函数:

void addConsumer(Consumer consumer) throws BrokerServiceException {
// 添加consumer
// 并且更新相关consumer服务的hash值的范围值。
}

void removeConsumer(Consumer consumer) throws BrokerServiceException {
// 移除consumer
// 并且更新相关consumer服务的hash值的范围值。
}

Consumer getConsumer(String key) {
// 根据对应的key,返回服务的consumer
}

当从bk读取消息之后,根据每个entry包含的key不同,把每个entry dispatch到对应的consumer中

void readEntriesComplete(List<Entry> entries, Object ctx) {
// 根据每个entry包含的key不同,把每个entry dispatch到对应的consumer中。
} 

在这个函数中要处理的特殊情形:如果消息来了,对应的某一个Key的consumer没有permit,需要加入到messagestoReplay中,等有permits再处理。

对原有feature的影响

为了支持消息的batch,需要在producer端,添加batch by key的支持。在初始版本中可以先不考虑。

Consumer的dispatch优先级没有了,因为消息按照Key发送了。是Key决定了每个消息被发送给哪个consumer,而不是consumer的优先级决定的。这样流控会受到一定的影响。

@yangl
Copy link

yangl commented May 9, 2019

"同时又可以保证partiton中的每一个Key对应的消息只被发送到一个consumer"-----应该指的是 同一时刻 每一个key对应的消息只被发送到同一个consumer吧?不同时间段consumer的数据可能会有扩缩容对应的hash段有变化,如上边的0-T1这段时间k1发到C1,然后T1-T2时间段内有可能之前的k1消息被发到了C2。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment