Last active
August 15, 2018 06:11
Revisions
-
kiritsuku revised this gist
Feb 22, 2016 . 2 changed files with 0 additions and 2 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,7 +3,6 @@ import scala.util._ import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import akka.util._ This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,7 +3,6 @@ import scala.util._ import akka._ import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.util._ import akka.stream.stage._ -
kiritsuku revised this gist
Jan 31, 2016 . 1 changed file with 0 additions and 1 deletion.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -43,7 +43,6 @@ object ComplexTcpServer extends App { .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) .map { msg ⇒ system.log.debug(s"Server received: $msg"); msg } .via(closeConnection) .map(ByteString(_))) -
kiritsuku revised this gist
Jan 31, 2016 . No changes.There are no files selected for viewing
-
kiritsuku revised this gist
Jan 31, 2016 . 2 changed files with 54 additions and 0 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,53 @@ import scala.io.StdIn import scala.util._ import akka.actor._ import akka.stream._ import akka.stream.io.Framing import akka.stream.scaladsl._ import akka.stream.stage._ import akka.util._ object ComplexTcpClient extends App { implicit val client = ActorSystem("SimpleTcpClient") implicit val materializer = ActorMaterializer() val address = "127.0.0.1" val port = 6666 val connection = Tcp().outgoingConnection(address, port) val closeClient = new GraphStage[FlowShape[String, String]] { val in = Inlet[String]("closeClient.in") val out = Outlet[String]("closeClient.out") override val shape = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush() = grab(in) match { case "BYE" ⇒ println("Connection closed") completeStage() case msg ⇒ println(msg) push(out, msg) } }) setHandler(out, new OutHandler { override def onPull() = pull(in) }) } } val flow = Flow[ByteString] .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) .via(closeClient) .map(_ ⇒ StdIn.readLine("> ")) .map(_+"\n") .map(ByteString(_)) connection.join(flow).run() } This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,5 @@ import scala.util._ import akka._ import akka.actor._ import akka.stream._ -
kiritsuku created this gist
Jan 31, 2016 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,85 @@ import scala.util._ import akka._ import akka.actor._ import akka.stream._ import akka.stream.io._ import akka.stream.scaladsl._ import akka.util._ import akka.stream.stage._ object ComplexTcpServer extends App { val closeConnection = new GraphStage[FlowShape[String, String]] { val in = Inlet[String]("closeConnection.in") val out = Outlet[String]("closeConnection.out") override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush() = grab(in) match { case "q" ⇒ push(out, "BYE") completeStage() case msg ⇒ push(out, s"Server hereby responds to message: $msg\n") } }) setHandler(out, new OutHandler { override def onPull() = pull(in) }) } } def serverLogic (conn: Tcp.IncomingConnection) (implicit system: ActorSystem) : Flow[ByteString, ByteString, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n")) val logic = b.add(Flow[ByteString] .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) .map { msg ⇒ system.log.debug(s"Server received: $msg"); msg } .map(_+"\n") .via(closeConnection) .map(ByteString(_))) val concat = b.add(Concat[ByteString]()) welcome ~> concat.in(0) logic.outlet ~> concat.in(1) FlowShape(logic.in, concat.out) }) def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = { import system.dispatcher val connectionHandler = Sink.foreach[Tcp.IncomingConnection] { conn ⇒ system.log.debug(s"Incoming connection from: ${conn.remoteAddress}") conn.handleWith(serverLogic(conn)) } val incomingCnnections = Tcp().bind(address, port) val binding = incomingCnnections.to(connectionHandler).run() binding onComplete { case Success(b) ⇒ system.log.debug(s"Server started, listening on: ${b.localAddress}") case Failure(e) ⇒ system.log.debug(s"Server could not be bound to $address:$port: ${e.getMessage}") } } def mkAkkaServer() = { val address = "127.0.0.1" val port = 6666 implicit val server = ActorSystem("Server") implicit val materializer = ActorMaterializer() mkServer(address, port) } mkAkkaServer() }