Skip to content

Instantly share code, notes, and snippets.

@knaufk
Last active April 14, 2016 10:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save knaufk/a0026eff9792aa905a2f586f3b0cb112 to your computer and use it in GitHub Desktop.
Save knaufk/a0026eff9792aa905a2f586f3b0cb112 to your computer and use it in GitHub Desktop.
public class AccTest {
public static final String ACC_NAME1 = "accName";
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Integer> source = env.fromCollection(Lists.newArrayList(1, 2, 3, 4, 5));
source.map(new AccMap()).setParallelism(1)
.map(new AccMap()).setParallelism(2)
.print();
source.map(new AccMap()).setParallelism(1);
System.out.println(env.getLastJobExecutionResult()
.getAllAccumulatorResults());
}
public static class AccMap extends RichMapFunction<Integer, Integer> {
private LongCounter counter = new LongCounter();
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator(ACC_NAME1, counter);
}
@Override
public Integer map(Integer integer) throws Exception {
return integer;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment