Skip to content

Instantly share code, notes, and snippets.

Created June 18, 2018 16:02
Show Gist options
  • Save flyaruu/84b65d52a01987946b07d21bafe0d5f1 to your computer and use it in GitHub Desktop.
Save flyaruu/84b65d52a01987946b07d21bafe0d5f1 to your computer and use it in GitHub Desktop.
One-to-many grouped
package com.dexels.kafka.streams.remotejoin.ranged;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dexels.kafka.streams.base.StreamOperators;
import com.dexels.kafka.streams.remotejoin.PreJoinProcessor;
import com.dexels.replication.api.ReplicationMessage;
import com.dexels.replication.api.ReplicationMessage.Operation;
public class OneToManyGroupedProcessor extends AbstractProcessor<String, ReplicationMessage> {
private final static Logger logger = LoggerFactory.getLogger(OneToManyGroupedProcessor.class);
private String storeName;
private String groupedStoreName;
private boolean optional;
private KeyValueStore<String, ReplicationMessage> groupedLookupStore;
private KeyValueStore<String, ReplicationMessage> lookupStore;
private BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> joinFunction;
private Predicate<String, ReplicationMessage> filterPredicate;
public OneToManyGroupedProcessor(String storeName, String groupedStoreName, boolean optional,
Optional<Predicate<String, ReplicationMessage>> filterPredicate,
BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> joinFunction) {
this.storeName = storeName;
this.groupedStoreName = groupedStoreName;
this.optional = optional;
this.joinFunction = joinFunction;
this.filterPredicate = filterPredicate.orElse((k, v) -> true);
public void init(ProcessorContext context) {
this.lookupStore = (KeyValueStore<String, ReplicationMessage>) context.getStateStore(storeName);
this.groupedLookupStore = (KeyValueStore<String, ReplicationMessage>) context.getStateStore(groupedStoreName);
public void process(String key, ReplicationMessage msg) {
boolean reverse = false;
if (key.endsWith(PreJoinProcessor.REVERSE_IDENTIFIER)) {
reverse = true;
key = key.substring(0, key.length() - PreJoinProcessor.REVERSE_IDENTIFIER.length());
if (reverse) {
reverseJoin(key, msg);
} else {
if (msg == null) {
logger.debug("O2M Emitting null message with key: {}", key);
context().forward(key, null);
forwardJoin(key, msg);
private void forwardJoin(String key, ReplicationMessage msg) {
try {
if (!filterPredicate.test(key, msg)) {
// filter says no
forwardMessage(key, msg.withOperation(Operation.DELETE));
} catch (Throwable t) {
logger.error("Error on checking filter predicate: {}", t);
List<ReplicationMessage> msgs = new ArrayList<>();
try (KeyValueIterator<String, ReplicationMessage> it = groupedLookupStore.range(key + "|", key + "}")) {
while (it.hasNext()) {
KeyValue<String, ReplicationMessage> keyValue =;
ReplicationMessage joined = msg;
if (msgs.size() > 0) {
joined = joinFunction.apply(msg, msgs);
if (optional || msgs.size() > 0) {
forwardMessage(key, joined);
} else {
// We are not optional, and have not joined with any messages. Forward a delete
forwardMessage(key, joined.withOperation(Operation.DELETE));
private void reverseJoin(String key, ReplicationMessage msg) {
String actualKey = StreamOperators.ungroupKeyReverse(key);
ReplicationMessage one = lookupStore.get(actualKey);
if (one == null) {
// We are doing a reverse join, but the original message isn't there.
// Nothing to do for us here
// OneToMany, thus we need to find all the other messages that
// we also have to join with us. Effectively the same as a
// forward join.
forwardJoin(actualKey, one);
private void forwardMessage(String key, ReplicationMessage innerMessage) {
context().forward(key, innerMessage);
// flush downstream stores with null:
if (innerMessage.operation() == Operation.DELETE) {
logger.debug("Delete forwarded, appending null forward with key: {}", key);
context().forward(key, null);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment