Created
February 22, 2019 09:57
-
-
Save gokhanoner/4d66722276ae9d7a49a5bb68bcf5582c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static com.hazelcast.jet.function.DistributedFunction.identity; | |
import static com.hazelcast.jet.pipeline.JournalInitialPosition.START_FROM_OLDEST; | |
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding; | |
import com.hazelcast.core.IMap; | |
import com.hazelcast.jet.Jet; | |
import com.hazelcast.jet.JetInstance; | |
import com.hazelcast.jet.Traversers; | |
import com.hazelcast.jet.aggregate.AggregateOperations; | |
import com.hazelcast.jet.config.JetConfig; | |
import com.hazelcast.jet.datamodel.Tuple2; | |
import com.hazelcast.jet.pipeline.BatchStage; | |
import com.hazelcast.jet.pipeline.Pipeline; | |
import com.hazelcast.jet.pipeline.Sinks; | |
import com.hazelcast.jet.pipeline.Sources; | |
import com.hazelcast.jet.pipeline.StreamStage; | |
import java.io.Serializable; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.Set; | |
import lombok.Data; | |
public class ReduceWindowResultsWithJet { | |
private static final String DATA_MAP = "test"; | |
private static final String DISTINCT_ATTR_LIST = "distinct"; | |
private static final String DISTINCT_MAP = "distinct"; | |
private static final Map<Integer, Tuple2<String, String>> VALUE_MAP = new HashMap<>(); | |
public static void main(String[] args) throws InterruptedException { | |
VALUE_MAP.put(0, Tuple2.tuple2("color", "green")); | |
VALUE_MAP.put(1, Tuple2.tuple2("position", "left")); | |
VALUE_MAP.put(2, Tuple2.tuple2("color", "yellow")); | |
VALUE_MAP.put(3, Tuple2.tuple2("position", "right")); | |
JetConfig jetConfig = new JetConfig(); | |
jetConfig.getHazelcastConfig().getMapEventJournalConfig(DATA_MAP).setEnabled(true).setCapacity(100_000); | |
JetInstance jet = Jet.newJetInstance(jetConfig); | |
jet.newJob(createJob()); | |
IMap<Integer, DataHolder> tickers = jet.getMap(DATA_MAP); | |
//Insert Data | |
for (int i = 0; i < 100; i++) { | |
int k = i; | |
Arrays.stream(Key.values()) | |
.forEach(key -> tickers.set(k, DataHolder.of(key, k).addValue(VALUE_MAP.get(k % 4)))); | |
} | |
jet.newJob(jobDistinctAttributes()).join(); | |
int size = jet.getList(DISTINCT_ATTR_LIST).size(); | |
System.out.println(jet.getList(DISTINCT_ATTR_LIST).subList(0, size)); | |
Thread.sleep(10_000); | |
System.out.println(jet.getMap(DISTINCT_MAP).get(DISTINCT_ATTR_LIST)); | |
jet.shutdown(); | |
} | |
private static Pipeline createJob() { | |
Pipeline p = Pipeline.create(); | |
StreamStage<DataHolder> streamStage = p | |
.drawFrom(Sources.<Integer, DataHolder>mapJournal(DATA_MAP, START_FROM_OLDEST)) | |
.addTimestamps(e -> e.getValue().getTs(), 0) | |
.map(Entry::getValue); | |
streamStage.groupingKey(DataHolder::getKey) | |
.window(sliding(3, 1)) | |
.aggregate(AggregateOperations.reducing(DataHolder.of(Key.KEY1, 0L), | |
identity(), (d1, d2) -> DataHolder.of(d2.getKey(), d2.getTs()).mergeValues(d1).mergeValues(d2), | |
(d1, d2) -> d1)) | |
.drainTo(Sinks.logger()); | |
StreamStage<Set<String>> rollingAggregate = streamStage | |
.flatMap(e -> Traversers.traverseIterable(e.getValues().keySet())) | |
.rollingAggregate(AggregateOperations.toSet()); | |
rollingAggregate.drainTo(Sinks.logger()); | |
rollingAggregate | |
.drainTo(Sinks.mapWithMerging(DISTINCT_MAP, strings -> DISTINCT_ATTR_LIST, identity(), (t1, t2) -> { | |
t2.addAll(t1); | |
return t2; | |
})); | |
return p; | |
} | |
private static Pipeline jobDistinctAttributes() { | |
Pipeline p = Pipeline.create(); | |
BatchStage<String> distinct = p.drawFrom(Sources.<Integer, DataHolder>map(DATA_MAP)) | |
.flatMap(e -> Traversers.traverseIterable(e.getValue().getValues().keySet())) | |
.distinct(); | |
distinct.drainTo(Sinks.logger()); | |
distinct.drainTo(Sinks.list(DISTINCT_ATTR_LIST)); | |
return p; | |
} | |
@Data(staticConstructor = "of") | |
static class DataHolder implements Serializable { | |
private final Key key; | |
private final Map<String, String> values = new HashMap<>(); | |
private final long ts; | |
DataHolder mergeValues(DataHolder dh) { | |
values.putAll(dh.getValues()); | |
return this; | |
} | |
DataHolder addValue(Tuple2<String, String> kv) { | |
values.put(kv.f0(), kv.f1()); | |
return this; | |
} | |
} | |
enum Key { | |
KEY1, KEY2; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment