Created
December 11, 2018 02:31
-
-
Save pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 to your computer and use it in GitHub Desktop.
Multiple processors with shared state stores
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pgwhalen.demo; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.Topology; | |
import org.apache.kafka.streams.processor.Processor; | |
import org.apache.kafka.streams.processor.ProcessorContext; | |
import org.apache.kafka.streams.processor.ProcessorSupplier; | |
import org.apache.kafka.streams.state.KeyValueStore; | |
import org.apache.kafka.streams.state.Stores; | |
public class MyStatefulProcessors { | |
private final String storeOneName; | |
private final String storeTwoName; | |
public MyStatefulProcessors(String name, Topology topology) { | |
this.storeOneName = "store-one-" + name; | |
this.storeTwoName = "store-two-" + name; | |
topology.addStateStore( | |
Stores.keyValueStoreBuilder( | |
Stores.inMemoryKeyValueStore(storeOneName), | |
Serdes.String(), Serdes.Long())); | |
topology.addStateStore( | |
Stores.keyValueStoreBuilder( | |
Stores.inMemoryKeyValueStore(storeOneName), | |
Serdes.String(), Serdes.Long())); | |
} | |
public String[] stateStoreNames() { | |
return new String[]{storeOneName, storeTwoName}; | |
} | |
public ProcessorSupplier<String, Long> processorA() { | |
return ProcessorA::new; | |
} | |
public ProcessorSupplier<String, Double> processorB() { | |
return ProcessorB::new; | |
} | |
private class ProcessorA implements Processor<String, Long> { | |
private KeyValueStore<String, Long> storeOne; | |
private KeyValueStore<String, Double> storeTwo; | |
@Override | |
public void init(ProcessorContext context) { | |
storeOne = (KeyValueStore<String, Long>) context.getStateStore(storeOneName); | |
storeTwo = (KeyValueStore<String, Double>) context.getStateStore(storeTwoName); | |
} | |
@Override | |
public void process(String key, Long value) { | |
// gets from storeTwo, puts into storeOne, does something else | |
} | |
@Override | |
public void close() {} | |
} | |
private class ProcessorB implements Processor<String, Double> { | |
private KeyValueStore<String, Long> storeOne; | |
private KeyValueStore<String, Double> storeTwo; | |
@Override | |
public void init(ProcessorContext context) { | |
storeOne = (KeyValueStore<String, Long>) context.getStateStore(storeOneName); | |
storeTwo = (KeyValueStore<String, Double>) context.getStateStore(storeTwoName); | |
} | |
@Override | |
public void process(String key, Double value) { | |
// gets from storeOne, puts into storeTwo, does something else | |
} | |
@Override | |
public void close() {} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment