Skip to content

Instantly share code, notes, and snippets.

@drewhk
Last active August 18, 2018 07:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save drewhk/25bf7472db04b5699b80 to your computer and use it in GitHub Desktop.
Save drewhk/25bf7472db04b5699b80 to your computer and use it in GitHub Desktop.
package akka.stream
import java.net.InetSocketAddress
import java.nio.ByteOrder
import akka.actor.ActorSystem
import akka.stream.io.Framing
import akka.stream.scaladsl.Tcp.ServerBinding
import akka.stream.scaladsl._
import akka.stream.stage.{ TerminationDirective, SyncDirective, Context, PushPullStage }
import akka.util.{ ByteStringBuilder, ByteString }
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
/**
* Stage that will emit an Iterable (potentially infinite) but stops as soon as the upstream is completed.
* The iterator must not be shared with anything else.
*
* Takes an InetSocketAddress to provide additional console logging for this example
*/
class InterruptibleIterator[T](iterator: Iterator[T], client: InetSocketAddress) extends PushPullStage[Any, T] {
override def onPush(elem: Any, ctx: Context[T]): SyncDirective =
ctx.fail(new UnsupportedOperationException("Client is not allowed to send anything"))
override def onPull(ctx: Context[T]): SyncDirective = {
if (iterator.hasNext) ctx.push(iterator.next())
else ctx.finish()
}
// Strictly speaking there is no need to override this, but this makes it explicit that termination
// is immediately propagated
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
println(s"### Streaming has finished for $client")
ctx.finish()
}
}
object RandomNumService {
def main(args: Array[String]) {
implicit val system = ActorSystem("testSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
// --- Dummy codecs
// For simplicity we use a length field header (in this case a constant 4)
val encoder: Flow[Int, ByteString, Unit] = Flow[Int].map { i ⇒
(new ByteStringBuilder)
.putByte(4) // message length are always of size 4, and the header is one byte
.putInt(i)(ByteOrder.BIG_ENDIAN).result()
}.named("encoder")
val decoder: Flow[ByteString, Int, Unit] =
Framing.lengthField(fieldLength = 1, maximumFrameLength = 5)
.map { bytes ⇒
val iterator = bytes.iterator
iterator.next // ignore header, we are interested in only the payloads
iterator.getInt(ByteOrder.BIG_ENDIAN)
}.named("decoder")
// --- Client side
val remoteRandomNumbers: Source[Int, Promise[Unit]] =
Source.lazyEmpty
.via(Tcp().outgoingConnection("localhost", 1001))
.via(decoder)
.named("randomNumbers")
// --- Server side
// Dummy infinite Iterable
def rng = Iterator.continually(ThreadLocalRandom.current().nextInt())
// A Flow that will emit random numbers downstream until the upstream finishes
def interruptibleIterable[T](itr: Iterator[T], client: InetSocketAddress): Flow[Unit, T, Unit] =
Flow[Unit]
.transform(() ⇒ new InterruptibleIterator[T](itr, client))
val server: RunnableGraph[Future[ServerBinding]] =
Tcp().bind("localhost", 1001).to(Sink.foreach { incomingConnection ⇒
println(s"### ${incomingConnection.remoteAddress} connected")
val client: Flow[Int, Unit, Unit] = encoder.via(incomingConnection.flow).map(_ ⇒ ())
client.join(interruptibleIterable(rng, incomingConnection.remoteAddress)).run()
})
// --- Usage
val binding = server.run()
println("Press ENTER to start streaming from the server to the console,")
println("then press ENTER again to stop the streaming")
readLine()
val disconnectPromise: Promise[Unit] = remoteRandomNumbers.to(Sink.foreach { number ⇒
println(number)
Thread.sleep(500) // Throttle a bit
}).run()
readLine()
disconnectPromise.trySuccess(())
// Alternative usage, client only wants 100 numbers, and all positive
val numbers: Future[Iterable[Int]] = remoteRandomNumbers
.filter(_ > 0)
.take(100) // need 100 of them
.grouped(100) // collect into a collection
.toMat(Sink.head) { (completePromise, iterableFuture) ⇒
// the server streams until we close the connection, so we need to fulfill the promise when
// the iterableFuture is ready
completePromise.completeWith(iterableFuture.map(_ ⇒ ()))
iterableFuture
}.run()
println(s"Numbers from server: ${Await.result(numbers, 3.seconds)}")
// --- Cleanup
Await.result(binding.map(_.unbind()), 3.seconds)
system.shutdown()
system.awaitTermination()
}
}
package akka.stream
import java.net.InetSocketAddress
import java.nio.ByteOrder
import akka.actor.ActorSystem
import akka.stream.io.Framing
import akka.stream.scaladsl.Tcp.{ ServerBinding, IncomingConnection }
import akka.stream.scaladsl._
import akka.util.{ ByteString, ByteStringBuilder }
import scala.collection.immutable.Iterable
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
object TestApp {
def main(args: Array[String]) {
implicit val system = ActorSystem("testSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
// --- Dummy codecs
// For simplicity we use a length field header (in this case a constant 4)
val encoder: Flow[Int, ByteString, Unit] = Flow[Int].map { i ⇒
(new ByteStringBuilder)
.putByte(4) // message length are always of size 4, and the header is one byte
.putInt(i)(ByteOrder.BIG_ENDIAN).result()
}.named("encoder")
val decoder: Flow[ByteString, Int, Unit] =
Framing.lengthField(fieldLength = 1, maximumFrameLength = 5)
.map { bytes ⇒
val iterator = bytes.iterator
iterator.next // ignore header, we are interested in only the payloads
iterator.getInt(ByteOrder.BIG_ENDIAN)
}
.named("decoder")
// --- Client side
// Reusable sink that for each materialization will open a connection, and accepts a stream of Ints to be passed
// to the server
val dumpToServer: Sink[Int, Unit] =
Flow[Int]
.via(encoder) // encode ints to packets
.via(Tcp().outgoingConnection("localhost", 1001)) // Send to server
.to(Sink.cancelled) // we are not reading from the server, safe to cancel this side
.named("dumpToServer")
// --- Server side
// Since we need to drain to a strict collection we cap
val MAX_LENGTH = 100
val parseCollection: Flow[ByteString, Iterable[Int], Unit] = Flow[ByteString]
.via(decoder)
.grouped(MAX_LENGTH) // A custom stage would be better that errors if limit exceeded. This will just truncate
.named("parseCollection")
// Source that will bind to the socket on materialization, and will provide a stream of (clientAddress, iterable)
// pairs that a service can consume
val server: Source[(InetSocketAddress, Iterable[Int]), Future[ServerBinding]] =
Tcp().bind("localhost", 1001) // bind the server
.mapAsyncUnordered(4) { incomingConnection ⇒ // accept 4 connections in parallel
val client: InetSocketAddress = incomingConnection.remoteAddress
println(s"### $client connected")
val (closePromise, iterableFuture) =
Source.lazyEmpty // We want to control closing of the write side (and hence connection) via a promise
.via(incomingConnection.flow) // plug the lazyEmpty to the output and grab the incoming bytes
.via(parseCollection) // create an iterable from the bytes
.toMat(Sink.head)(Keep.both).run() // grab a future for the iterable
// Close the connection once the iterable has been consumed
closePromise.completeWith(iterableFuture.map { _ ⇒ ByteString.empty })
// Emit a pair of address and iterable (it is in a Future but mapAsync flattens it for us)
iterableFuture.map((client, _))
}
.named("server")
// --- Usage
// Dummy service
val service: Sink[(InetSocketAddress, Iterable[Int]), Future[Unit]] =
Sink.foreach[(InetSocketAddress, Iterable[Int])] {
case (client, iterable) ⇒
println(s"### $client sent $iterable")
}.named("dummyService")
// Fire up the server
val binding = server.to(service).run()
// Connect some clients
Source(List(1, 2, 3, 4)).runWith(dumpToServer)
Source(List(10, 20, 30, 40)).runWith(dumpToServer)
Source(List(42)).runWith(dumpToServer)
// --- Cleanup
readLine()
Await.result(binding.map(_.unbind()), 3.seconds)
system.shutdown()
system.awaitTermination()
}
}
package akka.stream
import java.net.InetSocketAddress
import java.nio.ByteOrder
import akka.actor.ActorSystem
import akka.stream.io.Framing
import akka.stream.scaladsl.Tcp.ServerBinding
import akka.stream.scaladsl._
import akka.util.{ ByteStringBuilder, ByteString }
import scala.collection.immutable.Iterable
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
object TestApp2 {
def main(args: Array[String]) {
implicit val system = ActorSystem("testSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
// --- Dummy codecs
// For simplicity we use a length field header (in this case a constant 4)
val intEncoder: Flow[Int, ByteString, Unit] = Flow[Int].map { i ⇒
(new ByteStringBuilder)
.putByte(4) // message length are always of size 4, and the header is one byte
.putInt(i)(ByteOrder.BIG_ENDIAN).result()
}.named("intEncoder")
val intDecoder: Flow[ByteString, Int, Unit] =
Framing.lengthField(fieldLength = 1, maximumFrameLength = 5)
.map { bytes ⇒
val iterator = bytes.iterator
iterator.next // ignore header, we are interested in only the payloads
iterator.getInt(ByteOrder.BIG_ENDIAN)
}.named("intDecoder")
val Delimiter = "\n"
val commandEncoder: Flow[String, ByteString, Unit] =
Flow[String].map { cmd ⇒ ByteString(cmd + Delimiter) }
val commandDecoder: Flow[ByteString, String, Unit] =
Framing.delimiter(ByteString(Delimiter), maximumFrameLength = 128)
.map(_.utf8String)
// --- Client side
// Since we need to drain to a strict collection we cap
val MAX_LENGTH = 100
val parseCollection: Flow[ByteString, Iterable[Int], Unit] = Flow[ByteString]
.via(intDecoder)
.grouped(MAX_LENGTH) // A custom stage would be better that errors if limit exceeded. This will just truncate
.named("parseCollection")
val serverConnection: Flow[String, Iterable[Int], Unit] =
commandEncoder // Encode commands to bytes
.via(Tcp().outgoingConnection("localhost", 1001)) // Send to server
.via(parseCollection) // decode received bytes to an Iterable of integers
// --- Server side
// Client connections receive integers, and send String commands
type ClientConnection = Flow[Int, String, Unit]
// Server side handler receives String commands and sends integers
type Service = Flow[String, Int, Unit]
val codec: BidiFlow[ByteString, String, Int, ByteString, Unit] =
BidiFlow.wrap(commandDecoder, intEncoder)(Keep.none)
val server: Source[ClientConnection, Future[ServerBinding]] =
Tcp().bind("localhost", 1001) // bind the server
.mapAsyncUnordered(4) { incomingConnection ⇒ // accept 4 connections in parallel
println(s"### client ${incomingConnection.remoteAddress} connected")
Future.successful(
intEncoder
.via(incomingConnection.flow)
.via(commandDecoder))
}
// --- Usage
val resources = Map(
"smallNumbers" -> List(1, 2, 3, 4, 5),
"largeNumbers" -> List(10, 20, 30, 40, 50))
def dummyAsnycCall(cmd: String): Future[Iterable[Int]] = {
Future.successful(resources(cmd))
}
val dummyService: Service =
Flow[String]
// Limit to one request per connection (otherwise the protocol needs a delimiter for iterable boundaries)
// which would complicate the example
.take(1)
.mapAsync(1)(dummyAsnycCall) // Call the async service to get the Iterable for the command
.mapConcat(identity) // stream the iterable
val binding = server.to(Sink.foreach { conn: ClientConnection ⇒
conn.join(dummyService).run() // Wire the client connection to the server-side service
}).run()
Source.single("smallNumbers").via(serverConnection).runForeach { iterable ⇒
println(s"Server sent $iterable for smallNumbers (Flow API)")
}
Source.single("largeNumbers").via(serverConnection).runForeach { iterable ⇒
println(s"Server sent $iterable for largeNumbers (Flow API)")
}
// Another way to encode the client API
val cmdToFuture: Sink[String, Future[Iterable[Int]]] = serverConnection.toMat(Sink.head)(Keep.right)
val result1: Future[Iterable[Int]] = Source.single("smallNumbers").runWith(cmdToFuture)
// ... or yet another way:
def requestIterable(cmd: String): Future[Iterable[Int]] = Source.single(cmd).runWith(cmdToFuture)
val result2: Future[Iterable[Int]] = requestIterable("largeNumbers")
println(s"Server sent ${Await.result(result1, 3.seconds)} for smallNumbers (Sink API)")
println(s"Server sent ${Await.result(result2, 3.seconds)} for largeNumbers (function API)")
// --- Cleanup
readLine()
Await.result(binding.map(_.unbind()), 3.seconds)
system.shutdown()
system.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment