Skip to content

Instantly share code, notes, and snippets.

@seeday
Created July 23, 2020 17:11
Show Gist options
  • Save seeday/28f2b2013f6cdb82d0c639aa05f13e14 to your computer and use it in GitHub Desktop.
Save seeday/28f2b2013f6cdb82d0c639aa05f13e14 to your computer and use it in GitHub Desktop.
  • Status: Proposal
  • Author: Chris Day
  • Pull Request: #7629
  • Mailing List discussion:
  • Release:

Motivation

Broker-side filtering of messages can reduce the network load significantly when running certain types of highly-filtered jobs. Example usecases are displaying data over websocket, filtering high-traffic "firehose" streams for large numbers of clients, and other tasks where total cpu can be saved by centralizing the filtering.

The ultimate goal for this feature is flink filter-pushdown, allowing for many more flink jobs to run on high-traffic topics without using large amounts of flink cpu doing the same filter task.

Scope

The initial PR identifies three basic types of filtering: byte prefix, regex, and schema-aware match. The implementation is designed such that it's easy to extend by users, only requiring a new jar on the pulsar-broker classpath and the correct class name in the consumer.

The number/type of filters that should be provided by the Pulsar project is an open question, but I don't see any harm in allowing a lot of generic filters.

Features

The initial PR allows the user to filter pulsar messages in a few different ways. Their consumer will only be sent messages that match their requested filter.

Byte-prefix filter

A schema-agnostic filter, the user specifies a string of bytes that all messages they receive must start with.

Regex filter

While this works best on strings, it's possible to write regexes such that they operate on bytes.

Basic schema-aware filter

This filter checks for exact string matches in the fields of a message with schema. This requires decoding the message on the broker. I stopped after writing this filter because schema will vary wildly, as will their filtering requirements. This filter is almost provided as an example to do more advanced things, like matching against a query language, regexes, or whatever the user desires.

Implementation

Internal Changes

The core of this implementaton lies in org.apache.pulsar.broker.service, and specifically the org.apache.pulsar.broker.service.Consumer. The filter object sent from the user is validated and built, then the consumer proceeds as usual. The next integration point is in Consumer.sendMessages. Messages are filtered as late as possible to not disturb the rest of the system. If the message matches the filter, it is sent normally. Otherwise, the message is added to a list of messages for the broker to ack on the consumer's behalf. This way, the consumer never sees those messages and the impact of this change is kept to a minimum.

metadataAndPayload.markReaderIndex();
Commands.skipMessageMetadata(metadataAndPayload);
metadataAndPayload.skipBytes(metadataAndPayload.readInt());
if (this.filter != null && !this.filter.matches(metadataAndPayload)) {
    filteredEntries.add(entry.getPosition());
    messageId.recycle();
    messageIdBuilder.recycle();
    entry.release();
    continue;
}
metadataAndPayload.resetReaderIndex();

Protocol Changes

A new field is added to CommandSubscribe to support sending filter configuration to the broker.

I would like to figure out a way to send more than just string types, optimally a serialized class.

message FilterMeta {
    required string filterClassName = 1;
    repeated KeyValue filterProperties = 2;
}
message CommandSubscribe {
 ...
 optional FilterMeta filterMeta = 18;
}

API Changes

The ConsumerBuilder gets a new function to add filtering configuration to the subscribe message. Static functions are provided to construct some of the bundled filtering functions, but otherwise it must be configured with the full class name of the filter.

protected MessageFilterPolicy(String className) {
    this.className = className;
    this.properties = new HashMap<>();
}
public static class RegexFilterPolicy extends MessageFilterPolicy {
    public RegexFilterPolicy(Pattern pattern) {
        super("org.apache.pulsar.broker.service.filtering.RegexFilter");
        properties.put("regex_filter_pattern_key", pattern.pattern());
    }
}

Example

val client = PulsarClient.builder().serviceUrl("pulsar://opspulsar.eogresources.com:6650").build()
val producer = client.newProducer(Schema.STRING)
    .topic("public/default/test")
    .create()
val consumer = client.newConsumer(Schema.STRING)
    .messageFilterPolicy(MessageFilterPolicy.regexFilterPolicy(Pattern.compile("[3-5][0-9]")))
    .topic("public/default/test")
    .subscriptionName("test-subs")
    .subscribe()

repeat(IntRange(0, 100).count()) {
    producer.send(it.toString())
}

var msgs = 0;
repeat(IntRange(30, 59).count()) {
    msgs += 1
    val msg = consumer.receive()
    println(String(msg.data, StandardCharsets.UTF_8))
}

assertEquals(30, msgs)

assertNull(consumer.receive(1, TimeUnit.SECONDS))
consumer.unsubscribe()
client.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment