Skip to content

Instantly share code, notes, and snippets.

@maarek

maarek/some.java Secret

Created June 18, 2017 16:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maarek/bb683cc29463e123137bfc1d1da6d036 to your computer and use it in GitHub Desktop.
Save maarek/bb683cc29463e123137bfc1d1da6d036 to your computer and use it in GitHub Desktop.
Streamz Akka Example
public class HttpSample extends HttpContext {
private final Logger logger = LoggerFactory.getLogger(HttpSample.class);
private ActorMaterializer actorMaterializer;
private HttpSample() throws Exception {
super();
ActorSystem actorSystem = ActorSystem.create("example");
actorMaterializer = ActorMaterializer.create(actorSystem);
}
private Runnable setup() {
/*
Akka Stream Definitions:
* Source: A processing stage with exactly one output, emitting data elements whenever downstream processing
stages are ready to receive them.
* Sink: A processing stage with exactly one input, requesting and accepting data elements possibly slowing down
the upstream producer of elements
* Flow: A processing stage which has exactly one input and output, which connects its up- and downstreams by
transforming the data elements flowing through it.
* RunnableFlow: A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be run().
*/
/*
Bind Camel Jetty Enpoint as our Source input
Options:
Flows are in-out sources
Stream are in only
*/
Flow<String, String, NotUsed> http =
receiveRequestBody(jettyEndpointUri, String.class);
/*
Create a string transformer
*/
Flow<String, String, NotUsed> greeting = Flow.of(String.class)
.map(in -> "Hello " + in + "!");
/*
Create a stage that will output http input to ElasticSearch
*/
Sink<String, CompletionStage<Done>> es = Sink.foreach(o -> logger.info("ElasticSearch: {}", o));
/*
Create Akka Stream Topology
Map the input stream request to a response string message
Join to the reply Flow
Http -> Map -> Reply
-> ES
-> Logger
*/
RunnableGraph<NotUsed> graph = RunnableGraph.fromGraph(GraphDSL.create(b -> {
/*
Build a shape for the source
*/
final FlowShape<String, String> httpShape = b.add(http);
/*
Build a shape for greeting flow
*/
final FlowShape<String, String> greetingShape = b.add(greeting);
/*
Build a shape for elastic search sink
*/
final SinkShape<String> esShape = b.add(es);
/*
Build a shape for logging
*/
final SinkShape loggerSinkShape = b.add(new LoggerSink("Logger"));
/*
Broadcaster
*/
final UniformFanOutShape<String, String> broadcaster = b.add(Broadcast.create(2));
/*
Topology
*/
b.from(httpShape)
.via(greetingShape)
.viaFanOut(broadcaster)
.to(esShape);
b.from(broadcaster)
.to(loggerSinkShape);
return ClosedShape.getInstance();
}));
/*
Materializer is a factory for stream execution engines, it is the thing that makes streams run.
*/
return () -> { graph.run(actorMaterializer); };
}
public static void main(String... args) throws Exception {
new HttpSample().setup().run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment