Skip to content

Instantly share code, notes, and snippets.

@kiritsuku
Last active August 15, 2018 06:11
Show Gist options
  • Save kiritsuku/df6315c87140d1d5951e to your computer and use it in GitHub Desktop.
Save kiritsuku/df6315c87140d1d5951e to your computer and use it in GitHub Desktop.
Complex TCP server and client
import scala.io.StdIn
import scala.util._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util._
object ComplexTcpClient extends App {
implicit val client = ActorSystem("SimpleTcpClient")
implicit val materializer = ActorMaterializer()
val address = "127.0.0.1"
val port = 6666
val connection = Tcp().outgoingConnection(address, port)
val closeClient = new GraphStage[FlowShape[String, String]] {
val in = Inlet[String]("closeClient.in")
val out = Outlet[String]("closeClient.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = grab(in) match {
case "BYE" ⇒
println("Connection closed")
completeStage()
case msg ⇒
println(msg)
push(out, msg)
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
}
val flow = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.via(closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_+"\n")
.map(ByteString(_))
connection.join(flow).run()
}
import scala.util._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
import akka.stream.stage._
object ComplexTcpServer extends App {
val closeConnection = new GraphStage[FlowShape[String, String]] {
val in = Inlet[String]("closeConnection.in")
val out = Outlet[String]("closeConnection.out")
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = grab(in) match {
case "q" ⇒
push(out, "BYE")
completeStage()
case msg ⇒
push(out, s"Server hereby responds to message: $msg\n")
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
}
def serverLogic
(conn: Tcp.IncomingConnection)
(implicit system: ActorSystem)
: Flow[ByteString, ByteString, NotUsed]
= Flow.fromGraph(GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
val logic = b.add(Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map { msg ⇒ system.log.debug(s"Server received: $msg"); msg }
.via(closeConnection)
.map(ByteString(_)))
val concat = b.add(Concat[ByteString]())
welcome ~> concat.in(0)
logic.outlet ~> concat.in(1)
FlowShape(logic.in, concat.out)
})
def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
import system.dispatcher
val connectionHandler = Sink.foreach[Tcp.IncomingConnection] { conn ⇒
system.log.debug(s"Incoming connection from: ${conn.remoteAddress}")
conn.handleWith(serverLogic(conn))
}
val incomingCnnections = Tcp().bind(address, port)
val binding = incomingCnnections.to(connectionHandler).run()
binding onComplete {
case Success(b) ⇒
system.log.debug(s"Server started, listening on: ${b.localAddress}")
case Failure(e) ⇒
system.log.debug(s"Server could not be bound to $address:$port: ${e.getMessage}")
}
}
def mkAkkaServer() = {
val address = "127.0.0.1"
val port = 6666
implicit val server = ActorSystem("Server")
implicit val materializer = ActorMaterializer()
mkServer(address, port)
}
mkAkkaServer()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment