Skip to content

Instantly share code, notes, and snippets.

@gokhanoner
Created February 22, 2019 09:57
Show Gist options
  • Save gokhanoner/4d66722276ae9d7a49a5bb68bcf5582c to your computer and use it in GitHub Desktop.
Save gokhanoner/4d66722276ae9d7a49a5bb68bcf5582c to your computer and use it in GitHub Desktop.
import static com.hazelcast.jet.function.DistributedFunction.identity;
import static com.hazelcast.jet.pipeline.JournalInitialPosition.START_FROM_OLDEST;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamStage;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import lombok.Data;
public class ReduceWindowResultsWithJet {
private static final String DATA_MAP = "test";
private static final String DISTINCT_ATTR_LIST = "distinct";
private static final String DISTINCT_MAP = "distinct";
private static final Map<Integer, Tuple2<String, String>> VALUE_MAP = new HashMap<>();
public static void main(String[] args) throws InterruptedException {
VALUE_MAP.put(0, Tuple2.tuple2("color", "green"));
VALUE_MAP.put(1, Tuple2.tuple2("position", "left"));
VALUE_MAP.put(2, Tuple2.tuple2("color", "yellow"));
VALUE_MAP.put(3, Tuple2.tuple2("position", "right"));
JetConfig jetConfig = new JetConfig();
jetConfig.getHazelcastConfig().getMapEventJournalConfig(DATA_MAP).setEnabled(true).setCapacity(100_000);
JetInstance jet = Jet.newJetInstance(jetConfig);
jet.newJob(createJob());
IMap<Integer, DataHolder> tickers = jet.getMap(DATA_MAP);
//Insert Data
for (int i = 0; i < 100; i++) {
int k = i;
Arrays.stream(Key.values())
.forEach(key -> tickers.set(k, DataHolder.of(key, k).addValue(VALUE_MAP.get(k % 4))));
}
jet.newJob(jobDistinctAttributes()).join();
int size = jet.getList(DISTINCT_ATTR_LIST).size();
System.out.println(jet.getList(DISTINCT_ATTR_LIST).subList(0, size));
Thread.sleep(10_000);
System.out.println(jet.getMap(DISTINCT_MAP).get(DISTINCT_ATTR_LIST));
jet.shutdown();
}
private static Pipeline createJob() {
Pipeline p = Pipeline.create();
StreamStage<DataHolder> streamStage = p
.drawFrom(Sources.<Integer, DataHolder>mapJournal(DATA_MAP, START_FROM_OLDEST))
.addTimestamps(e -> e.getValue().getTs(), 0)
.map(Entry::getValue);
streamStage.groupingKey(DataHolder::getKey)
.window(sliding(3, 1))
.aggregate(AggregateOperations.reducing(DataHolder.of(Key.KEY1, 0L),
identity(), (d1, d2) -> DataHolder.of(d2.getKey(), d2.getTs()).mergeValues(d1).mergeValues(d2),
(d1, d2) -> d1))
.drainTo(Sinks.logger());
StreamStage<Set<String>> rollingAggregate = streamStage
.flatMap(e -> Traversers.traverseIterable(e.getValues().keySet()))
.rollingAggregate(AggregateOperations.toSet());
rollingAggregate.drainTo(Sinks.logger());
rollingAggregate
.drainTo(Sinks.mapWithMerging(DISTINCT_MAP, strings -> DISTINCT_ATTR_LIST, identity(), (t1, t2) -> {
t2.addAll(t1);
return t2;
}));
return p;
}
private static Pipeline jobDistinctAttributes() {
Pipeline p = Pipeline.create();
BatchStage<String> distinct = p.drawFrom(Sources.<Integer, DataHolder>map(DATA_MAP))
.flatMap(e -> Traversers.traverseIterable(e.getValue().getValues().keySet()))
.distinct();
distinct.drainTo(Sinks.logger());
distinct.drainTo(Sinks.list(DISTINCT_ATTR_LIST));
return p;
}
@Data(staticConstructor = "of")
static class DataHolder implements Serializable {
private final Key key;
private final Map<String, String> values = new HashMap<>();
private final long ts;
DataHolder mergeValues(DataHolder dh) {
values.putAll(dh.getValues());
return this;
}
DataHolder addValue(Tuple2<String, String> kv) {
values.put(kv.f0(), kv.f1());
return this;
}
}
enum Key {
KEY1, KEY2;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment