Skip to content

Instantly share code, notes, and snippets.

@johanandren
Created August 18, 2016 12:50
Show Gist options
  • Save johanandren/a5b9e4f88fc9fc6ce56dac10bb81a50e to your computer and use it in GitHub Desktop.
Save johanandren/a5b9e4f88fc9fc6ce56dac10bb81a50e to your computer and use it in GitHub Desktop.
Sample of a source emitting random integers
package streams;
import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.SourceShape;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import java.util.Random;
public class RandomNumberSource extends GraphStage<SourceShape<Integer>> {
public final Outlet<Integer> out = Outlet.create("RandomNumberSource.out");
private final SourceShape<Integer> shape = SourceShape.of(out);
@Override
public SourceShape<Integer> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
private final Random random = new Random();
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
push(out, random.nextInt());
}
});
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment