Created
September 18, 2016 13:51
-
-
Save rbraeunlich/e2582075e2ccc0fd63580bdd95585aa5 to your computer and use it in GitHub Desktop.
The server implementation for Akka and WebSockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package de.blogspot.wrongtracks.websocket.server; | |
import static akka.http.javadsl.server.Directives.handleWebSocketMessages; | |
import static akka.http.javadsl.server.Directives.path; | |
import java.util.concurrent.CompletionStage; | |
import akka.NotUsed; | |
import akka.actor.ActorRef; | |
import akka.actor.ActorSystem; | |
import akka.actor.PoisonPill; | |
import akka.actor.Props; | |
import akka.http.javadsl.ConnectHttp; | |
import akka.http.javadsl.Http; | |
import akka.http.javadsl.ServerBinding; | |
import akka.http.javadsl.model.HttpRequest; | |
import akka.http.javadsl.model.HttpResponse; | |
import akka.http.javadsl.model.ws.Message; | |
import akka.http.javadsl.server.Route; | |
import akka.stream.ActorMaterializer; | |
import akka.stream.Materializer; | |
import akka.stream.OverflowStrategy; | |
import akka.stream.javadsl.Flow; | |
import akka.stream.javadsl.Sink; | |
import akka.stream.javadsl.Source; | |
public class Server { | |
private static ActorSystem system; | |
private static ActorRef distributor; | |
public static void main(String[] args) throws Exception { | |
system = ActorSystem.create(); | |
final Materializer materializer = ActorMaterializer.create(system); | |
final Flow<HttpRequest, HttpResponse, NotUsed> handler = createRoute().flow(system, materializer); | |
CompletionStage<ServerBinding> binding = Http.get(system).bindAndHandle(handler, | |
ConnectHttp.toHost("localhost", 8080), materializer); | |
distributor = system.actorOf(Props.create(DistributionActor.class), "distributor"); | |
System.out.println("Type return to exit"); | |
System.in.read(); | |
binding.thenCompose(ServerBinding::unbind).thenAccept(unbound -> system.terminate()); | |
} | |
public static Route createRoute() { | |
return path("greeter", () -> handleWebSocketMessages(newConnection())); | |
} | |
private static Flow<Message, Message, NotUsed> newConnection() { | |
ActorRef newUser = system.actorOf(Props.create(User.class, distributor)); | |
Sink<Message, NotUsed> sink = Sink.actorRef(newUser, PoisonPill.getInstance()); | |
Source<Message, NotUsed> source = Source.<Message>actorRef(10, OverflowStrategy.fail()) | |
.<NotUsed>mapMaterializedValue(outActor -> { | |
newUser.tell(Messages.ClientSendOut, outActor); | |
return NotUsed.getInstance(); | |
}); | |
return Flow.fromSinkAndSource(sink, source); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment