Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Last active February 27, 2018 07:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IgorBerman/188614485e192d7d62df700f23dddf91 to your computer and use it in GitHub Desktop.
Save IgorBerman/188614485e192d7d62df700f23dddf91 to your computer and use it in GitHub Desktop.
akka-http akka-streams java8
package com.example;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import scala.runtime.BoxedUnit;
import scala.util.Try;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.HostConnectionPool;
import akka.http.javadsl.Http;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class StreamWithHttpPoolExample {
//see https://jsonplaceholder.typicode.com/posts/1
@SuppressWarnings("unused")
private static class Post {
private int userId;
private int id;
private String title;
private String body;
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
@Override
public String toString() {
return "Post [userId=" + userId + ", id=" + id + ", title=" + title + "]";
}
}
public static void main(String[] args) throws Exception {
final ActorSystem system = ActorSystem.create();
final Materializer mat = ActorMaterializer.create(system);
//take some source
Source<Integer,NotUsed> source = Source.range(1, 10).named("source");
final String externalResource = "http://jsonplaceholder.typicode.com/";
//map source to requests to external resource
Source<Pair<HttpRequest, Integer>, NotUsed> requestsStream = source.map(y -> {
system.log().info("Creating reqest for " + y);
HttpRequest req = HttpRequest.create(externalResource + "posts/"+y);
return Pair.create(req, y);//we create Pair here due to API of cached host connection pool, see below!
}).named("requests");
//create http pool as flow
final Http http = Http.get(system);
final ConnectHttp connectHttp = ConnectHttp.toHost(externalResource);
Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool> cachedHostConnectionPool
= http.cachedHostConnectionPool(connectHttp, mat);
//map requests to maybe responses
Source<Pair<Try<HttpResponse>, Integer>, NotUsed> maybeResponsesStream = requestsStream.via(cachedHostConnectionPool).named("mayberesps");
//extract from response body of successfull requests body with json and map it to Post object
Source<Post, NotUsed> postsStream = maybeResponsesStream.mapAsync(4, x -> {
system.log().info("MapAsync" + x);
Try<HttpResponse> maybeResponse = x.first();
if (maybeResponse.isFailure()) {
throw new RuntimeException(x.first().failed().get());
} else {
HttpResponse rsp = x.first().get();
return Jackson.unmarshaller(Post.class).unmarshal(rsp.entity(), system.dispatcher(), mat);
}
}).named("posts");
Sink<Post, CompletionStage<Done>> consoleSink = Sink.foreach(x -> system.log().info("Out:" + x));
RunnableGraph<Pair<NotUsed,CompletionStage<Done>>> graph = postsStream.toMat(consoleSink, Keep.both());
Pair<NotUsed, CompletionStage<Done>> notUsedWithCompletionHook = graph.run(mat);
notUsedWithCompletionHook.second().whenComplete((x,t) -> {
try {
system.log().warning("Stream completed" + (t != null ? " with exception " + t + ", " + t.getCause() : ""));
try {
CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools();
shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}finally {
system.terminate();
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment