- Status: Proposal
- Author: Chris Day
- Pull Request: #7629
- Mailing List discussion:
- Release:
Broker-side filtering of messages can reduce the network load significantly when running certain types of highly-filtered jobs. Example usecases are displaying data over websocket, filtering high-traffic "firehose" streams for large numbers of clients, and other tasks where total cpu can be saved by centralizing the filtering.
The ultimate goal for this feature is flink filter-pushdown, allowing for many more flink jobs to run on high-traffic topics without using large amounts of flink cpu doing the same filter task.
The initial PR identifies three basic types of filtering: byte prefix, regex, and schema-aware match. The implementation is designed such that it's easy to extend by users, only requiring a new jar on the pulsar-broker classpath and the correct class name in the consumer.
The number/type of filters that should be provided by the Pulsar project is an open question, but I don't see any harm in allowing a lot of generic filters.
The initial PR allows the user to filter pulsar messages in a few different ways. Their consumer will only be sent messages that match their requested filter.
A schema-agnostic filter, the user specifies a string of bytes that all messages they receive must start with.
While this works best on strings, it's possible to write regexes such that they operate on bytes.
This filter checks for exact string matches in the fields of a message with schema. This requires decoding the message on the broker. I stopped after writing this filter because schema will vary wildly, as will their filtering requirements. This filter is almost provided as an example to do more advanced things, like matching against a query language, regexes, or whatever the user desires.
The core of this implementaton lies in org.apache.pulsar.broker.service
, and specifically the org.apache.pulsar.broker.service.Consumer
.
The filter object sent from the user is validated and built, then the consumer proceeds as usual.
The next integration point is in Consumer.sendMessages. Messages are filtered as late as possible to not disturb the rest of the system.
If the message matches the filter, it is sent normally. Otherwise, the message is added to a list of messages for the broker to ack on the consumer's behalf.
This way, the consumer never sees those messages and the impact of this change is kept to a minimum.
metadataAndPayload.markReaderIndex();
Commands.skipMessageMetadata(metadataAndPayload);
metadataAndPayload.skipBytes(metadataAndPayload.readInt());
if (this.filter != null && !this.filter.matches(metadataAndPayload)) {
filteredEntries.add(entry.getPosition());
messageId.recycle();
messageIdBuilder.recycle();
entry.release();
continue;
}
metadataAndPayload.resetReaderIndex();
A new field is added to CommandSubscribe
to support sending filter configuration to the broker.
I would like to figure out a way to send more than just string types, optimally a serialized class.
message FilterMeta {
required string filterClassName = 1;
repeated KeyValue filterProperties = 2;
}
message CommandSubscribe {
...
optional FilterMeta filterMeta = 18;
}
The ConsumerBuilder
gets a new function to add filtering configuration to the subscribe message.
Static functions are provided to construct some of the bundled filtering functions, but otherwise it must be configured with the full class name of the filter.
protected MessageFilterPolicy(String className) {
this.className = className;
this.properties = new HashMap<>();
}
public static class RegexFilterPolicy extends MessageFilterPolicy {
public RegexFilterPolicy(Pattern pattern) {
super("org.apache.pulsar.broker.service.filtering.RegexFilter");
properties.put("regex_filter_pattern_key", pattern.pattern());
}
}
val client = PulsarClient.builder().serviceUrl("pulsar://opspulsar.eogresources.com:6650").build()
val producer = client.newProducer(Schema.STRING)
.topic("public/default/test")
.create()
val consumer = client.newConsumer(Schema.STRING)
.messageFilterPolicy(MessageFilterPolicy.regexFilterPolicy(Pattern.compile("[3-5][0-9]")))
.topic("public/default/test")
.subscriptionName("test-subs")
.subscribe()
repeat(IntRange(0, 100).count()) {
producer.send(it.toString())
}
var msgs = 0;
repeat(IntRange(30, 59).count()) {
msgs += 1
val msg = consumer.receive()
println(String(msg.data, StandardCharsets.UTF_8))
}
assertEquals(30, msgs)
assertNull(consumer.receive(1, TimeUnit.SECONDS))
consumer.unsubscribe()
client.close()