Skip to content

Instantly share code, notes, and snippets.

@fhueske
Created November 10, 2015 21:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fhueske/4ea5422edb5820915fa4 to your computer and use it in GitHub Desktop.
Save fhueske/4ea5422edb5820915fa4 to your computer and use it in GitHub Desktop.
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple1<String>> stream = ...
DataStream<Tuple1<String>> filters = ...
stream
.keyBy(0)
.connect(filters.keyBy(0)) // partition both streams on same field and connect them
.flatMap(new StreamFilter()) // apply CoFlatMap function to update filters and to filter data
.print();
// execute program
env.execute("Stream Filter");
}
public static class StreamFilter
extends RichCoFlatMapFunction<Tuple1<String>, Tuple1<String>, Tuple1<String>>
implements Checkpointed<HashSet<String>>
{
private HashSet<String> filters = new HashSet<>();
@Override
public void flatMap1(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
if(!filters.contains(value.f0)) {
out.collect(value);
}
}
@Override
public void flatMap2(Tuple1<String> filter, Collector<Tuple1<String>> out) throws Exception {
if(!filters.contains(filter.f0)) {
filters.add(filter.f0);
}
else {
filters.remove(filter.f0);
}
}
@Override
public HashSet<String> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return filters;
}
@Override
public void restoreState(HashSet<String> state) throws Exception {
filters = state;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment