Skip to content

Instantly share code, notes, and snippets.

@rcgoodfellow
Created May 21, 2015 18:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rcgoodfellow/67b060dd8f4f39b33f18 to your computer and use it in GitHub Desktop.
Save rcgoodfellow/67b060dd8f4f39b33f18 to your computer and use it in GitHub Desktop.
MultiStream
import akka.actor.ActorSystem
import java.io.File
import akka.stream.io.{SynchronousFileSink, SynchronousFileSource}
import akka.util.ByteString
import akka.stream.scaladsl.{Source, Sink, Flow, Broadcast, FlowGraph}
import akka.stream.scaladsl.FlattenStrategy
import akka.stream.stage.{PushStage, SyncDirective, TerminationDirective, Context}
import java.util.UUID
import FlowGraph.Implicits._
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Tcp.{ IncomingConnection, OutgoingConnection, ServerBinding}
import akka.stream.ActorFlowMaterializer
import scala.concurrent.Future
object MultiStream extends App {
implicit val sys = ActorSystem("ms")
import sys.dispatcher
implicit val materializer = ActorFlowMaterializer()
val streamSize = 10240
val applePort = "7777"
lazy val apples = ByteString( applePort + ("apple" * streamSize) )
//create a flow that redirects a stream to a port based on a content prefix
val flw =
Flow[ByteString]
.groupBy { _.utf8String.take(4) } //routing prefix is first 4 chars of stream
.map { case (port, src) =>
println(s"OUT++> $port") //debug - this part is working
src via Tcp().outgoingConnection("localhost", port.toInt) //connection gets created but no flow
}
.flatten(FlattenStrategy.concat) //not sure about this part
//create a TCP entry point for the stream to come in
val primaryIn = Tcp().bind("0.0.0.0", 6666)
val primaryHandler =
primaryIn runForeach { connection =>
println(s"primary connection from ${connection.remoteAddress}")
connection handleWith flw
}
//create an entry point for apple streams
val appleIn = Tcp().bind("0.0.0.0", 7777)
val appleHandler =
appleIn runForeach { connection =>
println(s"apple connection from ${connection.remoteAddress}")
connection handleWith Flow[ByteString].map{_ => print("*");ByteString("ok")}
}
//stream the apples through the primary input which should get redirected
//to the apple input
val primaryOut = Tcp().outgoingConnection("localhost", 6666)
Source.single(apples)
.via(primaryOut).runFold(ByteString.empty)((acc, in) => acc ++ in)
.onComplete { _ =>
Thread sleep 3000
sys.shutdown
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment