Last active
August 18, 2018 07:09
-
-
Save drewhk/25bf7472db04b5699b80 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.stream | |
import java.net.InetSocketAddress | |
import java.nio.ByteOrder | |
import akka.actor.ActorSystem | |
import akka.stream.io.Framing | |
import akka.stream.scaladsl.Tcp.ServerBinding | |
import akka.stream.scaladsl._ | |
import akka.stream.stage.{ TerminationDirective, SyncDirective, Context, PushPullStage } | |
import akka.util.{ ByteStringBuilder, ByteString } | |
import scala.concurrent.{ Await, Future, Promise } | |
import scala.concurrent.duration._ | |
import scala.concurrent.forkjoin.ThreadLocalRandom | |
/** | |
* Stage that will emit an Iterable (potentially infinite) but stops as soon as the upstream is completed. | |
* The iterator must not be shared with anything else. | |
* | |
* Takes an InetSocketAddress to provide additional console logging for this example | |
*/ | |
class InterruptibleIterator[T](iterator: Iterator[T], client: InetSocketAddress) extends PushPullStage[Any, T] { | |
override def onPush(elem: Any, ctx: Context[T]): SyncDirective = | |
ctx.fail(new UnsupportedOperationException("Client is not allowed to send anything")) | |
override def onPull(ctx: Context[T]): SyncDirective = { | |
if (iterator.hasNext) ctx.push(iterator.next()) | |
else ctx.finish() | |
} | |
// Strictly speaking there is no need to override this, but this makes it explicit that termination | |
// is immediately propagated | |
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { | |
println(s"### Streaming has finished for $client") | |
ctx.finish() | |
} | |
} | |
object RandomNumService { | |
def main(args: Array[String]) { | |
implicit val system = ActorSystem("testSystem") | |
implicit val materializer = ActorMaterializer() | |
import system.dispatcher | |
// --- Dummy codecs | |
// For simplicity we use a length field header (in this case a constant 4) | |
val encoder: Flow[Int, ByteString, Unit] = Flow[Int].map { i ⇒ | |
(new ByteStringBuilder) | |
.putByte(4) // message length are always of size 4, and the header is one byte | |
.putInt(i)(ByteOrder.BIG_ENDIAN).result() | |
}.named("encoder") | |
val decoder: Flow[ByteString, Int, Unit] = | |
Framing.lengthField(fieldLength = 1, maximumFrameLength = 5) | |
.map { bytes ⇒ | |
val iterator = bytes.iterator | |
iterator.next // ignore header, we are interested in only the payloads | |
iterator.getInt(ByteOrder.BIG_ENDIAN) | |
}.named("decoder") | |
// --- Client side | |
val remoteRandomNumbers: Source[Int, Promise[Unit]] = | |
Source.lazyEmpty | |
.via(Tcp().outgoingConnection("localhost", 1001)) | |
.via(decoder) | |
.named("randomNumbers") | |
// --- Server side | |
// Dummy infinite Iterable | |
def rng = Iterator.continually(ThreadLocalRandom.current().nextInt()) | |
// A Flow that will emit random numbers downstream until the upstream finishes | |
def interruptibleIterable[T](itr: Iterator[T], client: InetSocketAddress): Flow[Unit, T, Unit] = | |
Flow[Unit] | |
.transform(() ⇒ new InterruptibleIterator[T](itr, client)) | |
val server: RunnableGraph[Future[ServerBinding]] = | |
Tcp().bind("localhost", 1001).to(Sink.foreach { incomingConnection ⇒ | |
println(s"### ${incomingConnection.remoteAddress} connected") | |
val client: Flow[Int, Unit, Unit] = encoder.via(incomingConnection.flow).map(_ ⇒ ()) | |
client.join(interruptibleIterable(rng, incomingConnection.remoteAddress)).run() | |
}) | |
// --- Usage | |
val binding = server.run() | |
println("Press ENTER to start streaming from the server to the console,") | |
println("then press ENTER again to stop the streaming") | |
readLine() | |
val disconnectPromise: Promise[Unit] = remoteRandomNumbers.to(Sink.foreach { number ⇒ | |
println(number) | |
Thread.sleep(500) // Throttle a bit | |
}).run() | |
readLine() | |
disconnectPromise.trySuccess(()) | |
// Alternative usage, client only wants 100 numbers, and all positive | |
val numbers: Future[Iterable[Int]] = remoteRandomNumbers | |
.filter(_ > 0) | |
.take(100) // need 100 of them | |
.grouped(100) // collect into a collection | |
.toMat(Sink.head) { (completePromise, iterableFuture) ⇒ | |
// the server streams until we close the connection, so we need to fulfill the promise when | |
// the iterableFuture is ready | |
completePromise.completeWith(iterableFuture.map(_ ⇒ ())) | |
iterableFuture | |
}.run() | |
println(s"Numbers from server: ${Await.result(numbers, 3.seconds)}") | |
// --- Cleanup | |
Await.result(binding.map(_.unbind()), 3.seconds) | |
system.shutdown() | |
system.awaitTermination() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.stream | |
import java.net.InetSocketAddress | |
import java.nio.ByteOrder | |
import akka.actor.ActorSystem | |
import akka.stream.io.Framing | |
import akka.stream.scaladsl.Tcp.{ ServerBinding, IncomingConnection } | |
import akka.stream.scaladsl._ | |
import akka.util.{ ByteString, ByteStringBuilder } | |
import scala.collection.immutable.Iterable | |
import scala.concurrent.{ Await, Future } | |
import scala.concurrent.duration._ | |
object TestApp { | |
def main(args: Array[String]) { | |
implicit val system = ActorSystem("testSystem") | |
implicit val materializer = ActorMaterializer() | |
import system.dispatcher | |
// --- Dummy codecs | |
// For simplicity we use a length field header (in this case a constant 4) | |
val encoder: Flow[Int, ByteString, Unit] = Flow[Int].map { i ⇒ | |
(new ByteStringBuilder) | |
.putByte(4) // message length are always of size 4, and the header is one byte | |
.putInt(i)(ByteOrder.BIG_ENDIAN).result() | |
}.named("encoder") | |
val decoder: Flow[ByteString, Int, Unit] = | |
Framing.lengthField(fieldLength = 1, maximumFrameLength = 5) | |
.map { bytes ⇒ | |
val iterator = bytes.iterator | |
iterator.next // ignore header, we are interested in only the payloads | |
iterator.getInt(ByteOrder.BIG_ENDIAN) | |
} | |
.named("decoder") | |
// --- Client side | |
// Reusable sink that for each materialization will open a connection, and accepts a stream of Ints to be passed | |
// to the server | |
val dumpToServer: Sink[Int, Unit] = | |
Flow[Int] | |
.via(encoder) // encode ints to packets | |
.via(Tcp().outgoingConnection("localhost", 1001)) // Send to server | |
.to(Sink.cancelled) // we are not reading from the server, safe to cancel this side | |
.named("dumpToServer") | |
// --- Server side | |
// Since we need to drain to a strict collection we cap | |
val MAX_LENGTH = 100 | |
val parseCollection: Flow[ByteString, Iterable[Int], Unit] = Flow[ByteString] | |
.via(decoder) | |
.grouped(MAX_LENGTH) // A custom stage would be better that errors if limit exceeded. This will just truncate | |
.named("parseCollection") | |
// Source that will bind to the socket on materialization, and will provide a stream of (clientAddress, iterable) | |
// pairs that a service can consume | |
val server: Source[(InetSocketAddress, Iterable[Int]), Future[ServerBinding]] = | |
Tcp().bind("localhost", 1001) // bind the server | |
.mapAsyncUnordered(4) { incomingConnection ⇒ // accept 4 connections in parallel | |
val client: InetSocketAddress = incomingConnection.remoteAddress | |
println(s"### $client connected") | |
val (closePromise, iterableFuture) = | |
Source.lazyEmpty // We want to control closing of the write side (and hence connection) via a promise | |
.via(incomingConnection.flow) // plug the lazyEmpty to the output and grab the incoming bytes | |
.via(parseCollection) // create an iterable from the bytes | |
.toMat(Sink.head)(Keep.both).run() // grab a future for the iterable | |
// Close the connection once the iterable has been consumed | |
closePromise.completeWith(iterableFuture.map { _ ⇒ ByteString.empty }) | |
// Emit a pair of address and iterable (it is in a Future but mapAsync flattens it for us) | |
iterableFuture.map((client, _)) | |
} | |
.named("server") | |
// --- Usage | |
// Dummy service | |
val service: Sink[(InetSocketAddress, Iterable[Int]), Future[Unit]] = | |
Sink.foreach[(InetSocketAddress, Iterable[Int])] { | |
case (client, iterable) ⇒ | |
println(s"### $client sent $iterable") | |
}.named("dummyService") | |
// Fire up the server | |
val binding = server.to(service).run() | |
// Connect some clients | |
Source(List(1, 2, 3, 4)).runWith(dumpToServer) | |
Source(List(10, 20, 30, 40)).runWith(dumpToServer) | |
Source(List(42)).runWith(dumpToServer) | |
// --- Cleanup | |
readLine() | |
Await.result(binding.map(_.unbind()), 3.seconds) | |
system.shutdown() | |
system.awaitTermination() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.stream | |
import java.net.InetSocketAddress | |
import java.nio.ByteOrder | |
import akka.actor.ActorSystem | |
import akka.stream.io.Framing | |
import akka.stream.scaladsl.Tcp.ServerBinding | |
import akka.stream.scaladsl._ | |
import akka.util.{ ByteStringBuilder, ByteString } | |
import scala.collection.immutable.Iterable | |
import scala.concurrent.{ Await, Future } | |
import scala.concurrent.duration._ | |
object TestApp2 { | |
def main(args: Array[String]) { | |
implicit val system = ActorSystem("testSystem") | |
implicit val materializer = ActorMaterializer() | |
import system.dispatcher | |
// --- Dummy codecs | |
// For simplicity we use a length field header (in this case a constant 4) | |
val intEncoder: Flow[Int, ByteString, Unit] = Flow[Int].map { i ⇒ | |
(new ByteStringBuilder) | |
.putByte(4) // message length are always of size 4, and the header is one byte | |
.putInt(i)(ByteOrder.BIG_ENDIAN).result() | |
}.named("intEncoder") | |
val intDecoder: Flow[ByteString, Int, Unit] = | |
Framing.lengthField(fieldLength = 1, maximumFrameLength = 5) | |
.map { bytes ⇒ | |
val iterator = bytes.iterator | |
iterator.next // ignore header, we are interested in only the payloads | |
iterator.getInt(ByteOrder.BIG_ENDIAN) | |
}.named("intDecoder") | |
val Delimiter = "\n" | |
val commandEncoder: Flow[String, ByteString, Unit] = | |
Flow[String].map { cmd ⇒ ByteString(cmd + Delimiter) } | |
val commandDecoder: Flow[ByteString, String, Unit] = | |
Framing.delimiter(ByteString(Delimiter), maximumFrameLength = 128) | |
.map(_.utf8String) | |
// --- Client side | |
// Since we need to drain to a strict collection we cap | |
val MAX_LENGTH = 100 | |
val parseCollection: Flow[ByteString, Iterable[Int], Unit] = Flow[ByteString] | |
.via(intDecoder) | |
.grouped(MAX_LENGTH) // A custom stage would be better that errors if limit exceeded. This will just truncate | |
.named("parseCollection") | |
val serverConnection: Flow[String, Iterable[Int], Unit] = | |
commandEncoder // Encode commands to bytes | |
.via(Tcp().outgoingConnection("localhost", 1001)) // Send to server | |
.via(parseCollection) // decode received bytes to an Iterable of integers | |
// --- Server side | |
// Client connections receive integers, and send String commands | |
type ClientConnection = Flow[Int, String, Unit] | |
// Server side handler receives String commands and sends integers | |
type Service = Flow[String, Int, Unit] | |
val codec: BidiFlow[ByteString, String, Int, ByteString, Unit] = | |
BidiFlow.wrap(commandDecoder, intEncoder)(Keep.none) | |
val server: Source[ClientConnection, Future[ServerBinding]] = | |
Tcp().bind("localhost", 1001) // bind the server | |
.mapAsyncUnordered(4) { incomingConnection ⇒ // accept 4 connections in parallel | |
println(s"### client ${incomingConnection.remoteAddress} connected") | |
Future.successful( | |
intEncoder | |
.via(incomingConnection.flow) | |
.via(commandDecoder)) | |
} | |
// --- Usage | |
val resources = Map( | |
"smallNumbers" -> List(1, 2, 3, 4, 5), | |
"largeNumbers" -> List(10, 20, 30, 40, 50)) | |
def dummyAsnycCall(cmd: String): Future[Iterable[Int]] = { | |
Future.successful(resources(cmd)) | |
} | |
val dummyService: Service = | |
Flow[String] | |
// Limit to one request per connection (otherwise the protocol needs a delimiter for iterable boundaries) | |
// which would complicate the example | |
.take(1) | |
.mapAsync(1)(dummyAsnycCall) // Call the async service to get the Iterable for the command | |
.mapConcat(identity) // stream the iterable | |
val binding = server.to(Sink.foreach { conn: ClientConnection ⇒ | |
conn.join(dummyService).run() // Wire the client connection to the server-side service | |
}).run() | |
Source.single("smallNumbers").via(serverConnection).runForeach { iterable ⇒ | |
println(s"Server sent $iterable for smallNumbers (Flow API)") | |
} | |
Source.single("largeNumbers").via(serverConnection).runForeach { iterable ⇒ | |
println(s"Server sent $iterable for largeNumbers (Flow API)") | |
} | |
// Another way to encode the client API | |
val cmdToFuture: Sink[String, Future[Iterable[Int]]] = serverConnection.toMat(Sink.head)(Keep.right) | |
val result1: Future[Iterable[Int]] = Source.single("smallNumbers").runWith(cmdToFuture) | |
// ... or yet another way: | |
def requestIterable(cmd: String): Future[Iterable[Int]] = Source.single(cmd).runWith(cmdToFuture) | |
val result2: Future[Iterable[Int]] = requestIterable("largeNumbers") | |
println(s"Server sent ${Await.result(result1, 3.seconds)} for smallNumbers (Sink API)") | |
println(s"Server sent ${Await.result(result2, 3.seconds)} for largeNumbers (function API)") | |
// --- Cleanup | |
readLine() | |
Await.result(binding.map(_.unbind()), 3.seconds) | |
system.shutdown() | |
system.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment