Skip to content

Instantly share code, notes, and snippets.

@bentito
Last active September 24, 2021 03:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bentito/560eb95c64fa131efb34ad62c7bf60f8 to your computer and use it in GitHub Desktop.
Save bentito/560eb95c64fa131efb34ad62c7bf60f8 to your computer and use it in GitHub Desktop.
Updated version of a complete example of hooking a Websocket Server to an Akka Stream to an Akka Actor
// 2 years ago Johan Andrén posted a conversion from Scala of an example of hooking a Websocket to a Stream to an Actor
// I had to tweak it a bit to get it to work with more modern version of akka-http
// Maven deps for this are:
// <!-- http://akka.io/docs/#akka-http -->
// <dependency>
// <groupId>com.typesafe.akka</groupId>
// <artifactId>akka-http-core_2.11</artifactId>
// <version>10.0.6</version>
// </dependency>
// <dependency>
// <groupId>com.typesafe.akka</groupId>
// <artifactId>akka-http_2.11</artifactId>
// <version>10.0.6</version>
// </dependency>
// Original post is here:
// https://groups.google.com/forum/#!topic/akka-user/zOI6qdg_wqc
//
// The utility here is that with this system you can `tell` the actor (AnActor below) and
// have it send whatever you want to `out` which is the (previously) connected websocket.
import akka.NotUsed;
import akka.actor.*;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.server.HttpApp;
import akka.http.javadsl.server.Route;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Optional;
public class WebSocketServer {
private static final class Router extends HttpApp {
private final ActorSystem system;
public Router(ActorSystem system) {
this.system = system;
}
public Route createRoute() {
return route(
path("test").route(
get(handleWebSocketMessages(createWebSocketFlow()))
)
);
}
private Flow<Message, Message, NotUsed> createWebSocketFlow() {
ActorRef actor = system.actorOf(Props.create(AnActor.class));
Source<Message, NotUsed> source = Source.<Outgoing>actorRef(5, OverflowStrategy.fail())
.map((outgoing) -> (Message) TextMessage.create(outgoing.message))
.<NotUsed>mapMaterializedValue(destinationRef -> {
actor.tell(new OutgoingDestination(destinationRef), ActorRef.noSender());
return NotUsed.getInstance();
});
Sink<Message, NotUsed> sink = Flow.<Message>create()
.map((msg) -> new Incoming(msg.asTextMessage().getStrictText()))
.to(Sink.actorRef(actor, PoisonPill.getInstance()));
return Flow.fromSinkAndSource(sink, source);
}
}
public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create();
Router router = new Router(actorSystem);
router.bindRoute("127.0.0.1", 8082, actorSystem);
}
static class Incoming {
public final String message;
public Incoming(String message) {
this.message = message;
}
}
static class Outgoing {
public final String message;
public Outgoing(String message) {
this.message = message;
}
}
static class OutgoingDestination {
public final ActorRef destination;
OutgoingDestination(ActorRef destination) {
this.destination = destination;
}
}
static class AnActor extends AbstractActor {
private Optional<ActorRef> outgoing = Optional.empty();
public AnActor() {
receive(ReceiveBuilder.match(
OutgoingDestination.class, (msg) -> outgoing = Optional.of(msg.destination)
).match(
Incoming.class, (in) -> outgoing.ifPresent((out) -> out.tell(new Outgoing("got it"), self()))
).build());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment