-
-
Save maarek/bb683cc29463e123137bfc1d1da6d036 to your computer and use it in GitHub Desktop.
Streamz Akka Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class HttpSample extends HttpContext { | |
private final Logger logger = LoggerFactory.getLogger(HttpSample.class); | |
private ActorMaterializer actorMaterializer; | |
private HttpSample() throws Exception { | |
super(); | |
ActorSystem actorSystem = ActorSystem.create("example"); | |
actorMaterializer = ActorMaterializer.create(actorSystem); | |
} | |
private Runnable setup() { | |
/* | |
Akka Stream Definitions: | |
* Source: A processing stage with exactly one output, emitting data elements whenever downstream processing | |
stages are ready to receive them. | |
* Sink: A processing stage with exactly one input, requesting and accepting data elements possibly slowing down | |
the upstream producer of elements | |
* Flow: A processing stage which has exactly one input and output, which connects its up- and downstreams by | |
transforming the data elements flowing through it. | |
* RunnableFlow: A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be run(). | |
*/ | |
/* | |
Bind Camel Jetty Enpoint as our Source input | |
Options: | |
Flows are in-out sources | |
Stream are in only | |
*/ | |
Flow<String, String, NotUsed> http = | |
receiveRequestBody(jettyEndpointUri, String.class); | |
/* | |
Create a string transformer | |
*/ | |
Flow<String, String, NotUsed> greeting = Flow.of(String.class) | |
.map(in -> "Hello " + in + "!"); | |
/* | |
Create a stage that will output http input to ElasticSearch | |
*/ | |
Sink<String, CompletionStage<Done>> es = Sink.foreach(o -> logger.info("ElasticSearch: {}", o)); | |
/* | |
Create Akka Stream Topology | |
Map the input stream request to a response string message | |
Join to the reply Flow | |
Http -> Map -> Reply | |
-> ES | |
-> Logger | |
*/ | |
RunnableGraph<NotUsed> graph = RunnableGraph.fromGraph(GraphDSL.create(b -> { | |
/* | |
Build a shape for the source | |
*/ | |
final FlowShape<String, String> httpShape = b.add(http); | |
/* | |
Build a shape for greeting flow | |
*/ | |
final FlowShape<String, String> greetingShape = b.add(greeting); | |
/* | |
Build a shape for elastic search sink | |
*/ | |
final SinkShape<String> esShape = b.add(es); | |
/* | |
Build a shape for logging | |
*/ | |
final SinkShape loggerSinkShape = b.add(new LoggerSink("Logger")); | |
/* | |
Broadcaster | |
*/ | |
final UniformFanOutShape<String, String> broadcaster = b.add(Broadcast.create(2)); | |
/* | |
Topology | |
*/ | |
b.from(httpShape) | |
.via(greetingShape) | |
.viaFanOut(broadcaster) | |
.to(esShape); | |
b.from(broadcaster) | |
.to(loggerSinkShape); | |
return ClosedShape.getInstance(); | |
})); | |
/* | |
Materializer is a factory for stream execution engines, it is the thing that makes streams run. | |
*/ | |
return () -> { graph.run(actorMaterializer); }; | |
} | |
public static void main(String... args) throws Exception { | |
new HttpSample().setup().run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment