Skip to content

Instantly share code, notes, and snippets.

@rbraeunlich
Created September 18, 2016 13:51
Show Gist options
  • Save rbraeunlich/e2582075e2ccc0fd63580bdd95585aa5 to your computer and use it in GitHub Desktop.
Save rbraeunlich/e2582075e2ccc0fd63580bdd95585aa5 to your computer and use it in GitHub Desktop.
The server implementation for Akka and WebSockets
package de.blogspot.wrongtracks.websocket.server;
import static akka.http.javadsl.server.Directives.handleWebSocketMessages;
import static akka.http.javadsl.server.Directives.path;
import java.util.concurrent.CompletionStage;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class Server {
private static ActorSystem system;
private static ActorRef distributor;
public static void main(String[] args) throws Exception {
system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Flow<HttpRequest, HttpResponse, NotUsed> handler = createRoute().flow(system, materializer);
CompletionStage<ServerBinding> binding = Http.get(system).bindAndHandle(handler,
ConnectHttp.toHost("localhost", 8080), materializer);
distributor = system.actorOf(Props.create(DistributionActor.class), "distributor");
System.out.println("Type return to exit");
System.in.read();
binding.thenCompose(ServerBinding::unbind).thenAccept(unbound -> system.terminate());
}
public static Route createRoute() {
return path("greeter", () -> handleWebSocketMessages(newConnection()));
}
private static Flow<Message, Message, NotUsed> newConnection() {
ActorRef newUser = system.actorOf(Props.create(User.class, distributor));
Sink<Message, NotUsed> sink = Sink.actorRef(newUser, PoisonPill.getInstance());
Source<Message, NotUsed> source = Source.<Message>actorRef(10, OverflowStrategy.fail())
.<NotUsed>mapMaterializedValue(outActor -> {
newUser.tell(Messages.ClientSendOut, outActor);
return NotUsed.getInstance();
});
return Flow.fromSinkAndSource(sink, source);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment