Skip to content

Instantly share code, notes, and snippets.

@pgwhalen
Created December 11, 2018 02:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 to your computer and use it in GitHub Desktop.
Save pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 to your computer and use it in GitHub Desktop.
Multiple processors with shared state stores
package com.pgwhalen.demo;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
public class MyStatefulProcessors {
private final String storeOneName;
private final String storeTwoName;
public MyStatefulProcessors(String name, Topology topology) {
this.storeOneName = "store-one-" + name;
this.storeTwoName = "store-two-" + name;
topology.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(storeOneName),
Serdes.String(), Serdes.Long()));
topology.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(storeOneName),
Serdes.String(), Serdes.Long()));
}
public String[] stateStoreNames() {
return new String[]{storeOneName, storeTwoName};
}
public ProcessorSupplier<String, Long> processorA() {
return ProcessorA::new;
}
public ProcessorSupplier<String, Double> processorB() {
return ProcessorB::new;
}
private class ProcessorA implements Processor<String, Long> {
private KeyValueStore<String, Long> storeOne;
private KeyValueStore<String, Double> storeTwo;
@Override
public void init(ProcessorContext context) {
storeOne = (KeyValueStore<String, Long>) context.getStateStore(storeOneName);
storeTwo = (KeyValueStore<String, Double>) context.getStateStore(storeTwoName);
}
@Override
public void process(String key, Long value) {
// gets from storeTwo, puts into storeOne, does something else
}
@Override
public void close() {}
}
private class ProcessorB implements Processor<String, Double> {
private KeyValueStore<String, Long> storeOne;
private KeyValueStore<String, Double> storeTwo;
@Override
public void init(ProcessorContext context) {
storeOne = (KeyValueStore<String, Long>) context.getStateStore(storeOneName);
storeTwo = (KeyValueStore<String, Double>) context.getStateStore(storeTwoName);
}
@Override
public void process(String key, Double value) {
// gets from storeOne, puts into storeTwo, does something else
}
@Override
public void close() {}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment