Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Sample of a source emitting random integers
package streams;
import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.SourceShape;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import java.util.Random;
public class RandomNumberSource extends GraphStage<SourceShape<Integer>> {
public final Outlet<Integer> out = Outlet.create("RandomNumberSource.out");
private final SourceShape<Integer> shape = SourceShape.of(out);
@Override
public SourceShape<Integer> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
private final Random random = new Random();
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
push(out, random.nextInt());
}
});
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment