Last active
August 15, 2018 06:11
-
-
Save kiritsuku/df6315c87140d1d5951e to your computer and use it in GitHub Desktop.
Complex TCP server and client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.io.StdIn | |
import scala.util._ | |
import akka.actor._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage._ | |
import akka.util._ | |
object ComplexTcpClient extends App { | |
implicit val client = ActorSystem("SimpleTcpClient") | |
implicit val materializer = ActorMaterializer() | |
val address = "127.0.0.1" | |
val port = 6666 | |
val connection = Tcp().outgoingConnection(address, port) | |
val closeClient = new GraphStage[FlowShape[String, String]] { | |
val in = Inlet[String]("closeClient.in") | |
val out = Outlet[String]("closeClient.out") | |
override val shape = FlowShape.of(in, out) | |
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
setHandler(in, new InHandler { | |
override def onPush() = grab(in) match { | |
case "BYE" ⇒ | |
println("Connection closed") | |
completeStage() | |
case msg ⇒ | |
println(msg) | |
push(out, msg) | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull() = pull(in) | |
}) | |
} | |
} | |
val flow = Flow[ByteString] | |
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) | |
.map(_.utf8String) | |
.via(closeClient) | |
.map(_ ⇒ StdIn.readLine("> ")) | |
.map(_+"\n") | |
.map(ByteString(_)) | |
connection.join(flow).run() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.util._ | |
import akka._ | |
import akka.actor._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.util._ | |
import akka.stream.stage._ | |
object ComplexTcpServer extends App { | |
val closeConnection = new GraphStage[FlowShape[String, String]] { | |
val in = Inlet[String]("closeConnection.in") | |
val out = Outlet[String]("closeConnection.out") | |
override val shape = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
setHandler(in, new InHandler { | |
override def onPush() = grab(in) match { | |
case "q" ⇒ | |
push(out, "BYE") | |
completeStage() | |
case msg ⇒ | |
push(out, s"Server hereby responds to message: $msg\n") | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull() = pull(in) | |
}) | |
} | |
} | |
def serverLogic | |
(conn: Tcp.IncomingConnection) | |
(implicit system: ActorSystem) | |
: Flow[ByteString, ByteString, NotUsed] | |
= Flow.fromGraph(GraphDSL.create() { implicit b ⇒ | |
import GraphDSL.Implicits._ | |
val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n")) | |
val logic = b.add(Flow[ByteString] | |
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) | |
.map(_.utf8String) | |
.map { msg ⇒ system.log.debug(s"Server received: $msg"); msg } | |
.via(closeConnection) | |
.map(ByteString(_))) | |
val concat = b.add(Concat[ByteString]()) | |
welcome ~> concat.in(0) | |
logic.outlet ~> concat.in(1) | |
FlowShape(logic.in, concat.out) | |
}) | |
def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = { | |
import system.dispatcher | |
val connectionHandler = Sink.foreach[Tcp.IncomingConnection] { conn ⇒ | |
system.log.debug(s"Incoming connection from: ${conn.remoteAddress}") | |
conn.handleWith(serverLogic(conn)) | |
} | |
val incomingCnnections = Tcp().bind(address, port) | |
val binding = incomingCnnections.to(connectionHandler).run() | |
binding onComplete { | |
case Success(b) ⇒ | |
system.log.debug(s"Server started, listening on: ${b.localAddress}") | |
case Failure(e) ⇒ | |
system.log.debug(s"Server could not be bound to $address:$port: ${e.getMessage}") | |
} | |
} | |
def mkAkkaServer() = { | |
val address = "127.0.0.1" | |
val port = 6666 | |
implicit val server = ActorSystem("Server") | |
implicit val materializer = ActorMaterializer() | |
mkServer(address, port) | |
} | |
mkAkkaServer() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment