Skip to content

Instantly share code, notes, and snippets.

@DarthMax
Last active November 4, 2016 08:56
Show Gist options
  • Save DarthMax/69750ae434be369d6ded3738bb0f2a8e to your computer and use it in GitHub Desktop.
Save DarthMax/69750ae434be369d6ded3738bb0f2a8e to your computer and use it in GitHub Desktop.
Delta Iteration Problem
package org.my.example;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.hadoop.shaded.com.google.common.collect.Lists;
public class TestRunner {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1<Long>> a = env.fromCollection(Lists.newArrayList(new Tuple1<>(1L)));
DataSet<Tuple1<Long>> b = env.fromCollection(Lists.newArrayList(new Tuple1<>(1L),new Tuple1<>(2L),new Tuple1<>(3L)));
DeltaIteration<Tuple1<Long>,Tuple1<Long>> iteration = a.iterateDelta(b,1,0);
DataSet<Tuple1<Long>> nextWorkingSet = iteration.getWorkset();
DataSet<Tuple1<Long>> solutionSetDelta =
iteration.getSolutionSet().join(nextWorkingSet).where(0).equalTo(0).with(new MyCombiner());
DataSet<Tuple1<Long>> c = iteration.closeWith(solutionSetDelta,nextWorkingSet);
System.out.println("c = " + c.collect());
}
private static class MyCombiner extends RichJoinFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>> {
public Tuple1<Long> join(Tuple1<Long> a, Tuple1<Long> b) {
b.f0 = b.f0 + a.f0;
return b;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment