Last active
September 24, 2021 03:04
-
-
Save bentito/560eb95c64fa131efb34ad62c7bf60f8 to your computer and use it in GitHub Desktop.
Updated version of a complete example of hooking a Websocket Server to an Akka Stream to an Akka Actor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 2 years ago Johan Andrén posted a conversion from Scala of an example of hooking a Websocket to a Stream to an Actor | |
// I had to tweak it a bit to get it to work with more modern version of akka-http | |
// Maven deps for this are: | |
// <!-- http://akka.io/docs/#akka-http --> | |
// <dependency> | |
// <groupId>com.typesafe.akka</groupId> | |
// <artifactId>akka-http-core_2.11</artifactId> | |
// <version>10.0.6</version> | |
// </dependency> | |
// <dependency> | |
// <groupId>com.typesafe.akka</groupId> | |
// <artifactId>akka-http_2.11</artifactId> | |
// <version>10.0.6</version> | |
// </dependency> | |
// Original post is here: | |
// https://groups.google.com/forum/#!topic/akka-user/zOI6qdg_wqc | |
// | |
// The utility here is that with this system you can `tell` the actor (AnActor below) and | |
// have it send whatever you want to `out` which is the (previously) connected websocket. | |
import akka.NotUsed; | |
import akka.actor.*; | |
import akka.http.javadsl.model.ws.Message; | |
import akka.http.javadsl.model.ws.TextMessage; | |
import akka.http.javadsl.server.HttpApp; | |
import akka.http.javadsl.server.Route; | |
import akka.japi.pf.ReceiveBuilder; | |
import akka.stream.OverflowStrategy; | |
import akka.stream.javadsl.Flow; | |
import akka.stream.javadsl.Sink; | |
import akka.stream.javadsl.Source; | |
import java.util.Optional; | |
public class WebSocketServer { | |
private static final class Router extends HttpApp { | |
private final ActorSystem system; | |
public Router(ActorSystem system) { | |
this.system = system; | |
} | |
public Route createRoute() { | |
return route( | |
path("test").route( | |
get(handleWebSocketMessages(createWebSocketFlow())) | |
) | |
); | |
} | |
private Flow<Message, Message, NotUsed> createWebSocketFlow() { | |
ActorRef actor = system.actorOf(Props.create(AnActor.class)); | |
Source<Message, NotUsed> source = Source.<Outgoing>actorRef(5, OverflowStrategy.fail()) | |
.map((outgoing) -> (Message) TextMessage.create(outgoing.message)) | |
.<NotUsed>mapMaterializedValue(destinationRef -> { | |
actor.tell(new OutgoingDestination(destinationRef), ActorRef.noSender()); | |
return NotUsed.getInstance(); | |
}); | |
Sink<Message, NotUsed> sink = Flow.<Message>create() | |
.map((msg) -> new Incoming(msg.asTextMessage().getStrictText())) | |
.to(Sink.actorRef(actor, PoisonPill.getInstance())); | |
return Flow.fromSinkAndSource(sink, source); | |
} | |
} | |
public static void main(String[] args) { | |
ActorSystem actorSystem = ActorSystem.create(); | |
Router router = new Router(actorSystem); | |
router.bindRoute("127.0.0.1", 8082, actorSystem); | |
} | |
static class Incoming { | |
public final String message; | |
public Incoming(String message) { | |
this.message = message; | |
} | |
} | |
static class Outgoing { | |
public final String message; | |
public Outgoing(String message) { | |
this.message = message; | |
} | |
} | |
static class OutgoingDestination { | |
public final ActorRef destination; | |
OutgoingDestination(ActorRef destination) { | |
this.destination = destination; | |
} | |
} | |
static class AnActor extends AbstractActor { | |
private Optional<ActorRef> outgoing = Optional.empty(); | |
public AnActor() { | |
receive(ReceiveBuilder.match( | |
OutgoingDestination.class, (msg) -> outgoing = Optional.of(msg.destination) | |
).match( | |
Incoming.class, (in) -> outgoing.ifPresent((out) -> out.tell(new Outgoing("got it"), self())) | |
).build()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment