Skip to content

Instantly share code, notes, and snippets.

@kiritsuku
Last active August 15, 2018 06:11

Revisions

  1. kiritsuku revised this gist Feb 22, 2016. 2 changed files with 0 additions and 2 deletions.
    1 change: 0 additions & 1 deletion ComplexTcpClient.scala
    Original file line number Diff line number Diff line change
    @@ -3,7 +3,6 @@ import scala.util._

    import akka.actor._
    import akka.stream._
    import akka.stream.io.Framing
    import akka.stream.scaladsl._
    import akka.stream.stage._
    import akka.util._
    1 change: 0 additions & 1 deletion ComplexTcpServer.scala
    Original file line number Diff line number Diff line change
    @@ -3,7 +3,6 @@ import scala.util._
    import akka._
    import akka.actor._
    import akka.stream._
    import akka.stream.io._
    import akka.stream.scaladsl._
    import akka.util._
    import akka.stream.stage._
  2. kiritsuku revised this gist Jan 31, 2016. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion ComplexTcpServer.scala
    Original file line number Diff line number Diff line change
    @@ -43,7 +43,6 @@ object ComplexTcpServer extends App {
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map { msg system.log.debug(s"Server received: $msg"); msg }
    .map(_+"\n")
    .via(closeConnection)
    .map(ByteString(_)))

  3. kiritsuku revised this gist Jan 31, 2016. No changes.
  4. kiritsuku revised this gist Jan 31, 2016. 2 changed files with 54 additions and 0 deletions.
    53 changes: 53 additions & 0 deletions ComplexTcpClient.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,53 @@
    import scala.io.StdIn
    import scala.util._

    import akka.actor._
    import akka.stream._
    import akka.stream.io.Framing
    import akka.stream.scaladsl._
    import akka.stream.stage._
    import akka.util._

    object ComplexTcpClient extends App {

    implicit val client = ActorSystem("SimpleTcpClient")
    implicit val materializer = ActorMaterializer()

    val address = "127.0.0.1"
    val port = 6666

    val connection = Tcp().outgoingConnection(address, port)

    val closeClient = new GraphStage[FlowShape[String, String]] {
    val in = Inlet[String]("closeClient.in")
    val out = Outlet[String]("closeClient.out")

    override val shape = FlowShape.of(in, out)

    override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
    override def onPush() = grab(in) match {
    case "BYE"
    println("Connection closed")
    completeStage()
    case msg
    println(msg)
    push(out, msg)
    }
    })
    setHandler(out, new OutHandler {
    override def onPull() = pull(in)
    })
    }
    }

    val flow = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .via(closeClient)
    .map(_ StdIn.readLine("> "))
    .map(_+"\n")
    .map(ByteString(_))

    connection.join(flow).run()
    }
    1 change: 1 addition & 0 deletions ComplexTcpServer.scala
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,5 @@
    import scala.util._

    import akka._
    import akka.actor._
    import akka.stream._
  5. kiritsuku created this gist Jan 31, 2016.
    85 changes: 85 additions & 0 deletions ComplexTcpServer.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,85 @@
    import scala.util._
    import akka._
    import akka.actor._
    import akka.stream._
    import akka.stream.io._
    import akka.stream.scaladsl._
    import akka.util._
    import akka.stream.stage._

    object ComplexTcpServer extends App {

    val closeConnection = new GraphStage[FlowShape[String, String]] {
    val in = Inlet[String]("closeConnection.in")
    val out = Outlet[String]("closeConnection.out")

    override val shape = FlowShape(in, out)

    override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
    override def onPush() = grab(in) match {
    case "q"
    push(out, "BYE")
    completeStage()
    case msg
    push(out, s"Server hereby responds to message: $msg\n")
    }
    })
    setHandler(out, new OutHandler {
    override def onPull() = pull(in)
    })
    }
    }

    def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b
    import GraphDSL.Implicits._
    val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
    val logic = b.add(Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map { msg system.log.debug(s"Server received: $msg"); msg }
    .map(_+"\n")
    .via(closeConnection)
    .map(ByteString(_)))

    val concat = b.add(Concat[ByteString]())
    welcome ~> concat.in(0)
    logic.outlet ~> concat.in(1)

    FlowShape(logic.in, concat.out)
    })

    def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
    import system.dispatcher

    val connectionHandler = Sink.foreach[Tcp.IncomingConnection] { conn
    system.log.debug(s"Incoming connection from: ${conn.remoteAddress}")
    conn.handleWith(serverLogic(conn))
    }
    val incomingCnnections = Tcp().bind(address, port)
    val binding = incomingCnnections.to(connectionHandler).run()

    binding onComplete {
    case Success(b)
    system.log.debug(s"Server started, listening on: ${b.localAddress}")
    case Failure(e)
    system.log.debug(s"Server could not be bound to $address:$port: ${e.getMessage}")
    }
    }

    def mkAkkaServer() = {
    val address = "127.0.0.1"
    val port = 6666

    implicit val server = ActorSystem("Server")
    implicit val materializer = ActorMaterializer()

    mkServer(address, port)
    }

    mkAkkaServer()
    }