Complex TCP server and client
import scala.io.StdIn | |
import scala.util._ | |
import akka.actor._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage._ | |
import akka.util._ | |
object ComplexTcpClient extends App { | |
implicit val client = ActorSystem("SimpleTcpClient") | |
implicit val materializer = ActorMaterializer() | |
val address = "127.0.0.1" | |
val port = 6666 | |
val connection = Tcp().outgoingConnection(address, port) | |
val closeClient = new GraphStage[FlowShape[String, String]] { | |
val in = Inlet[String]("closeClient.in") | |
val out = Outlet[String]("closeClient.out") | |
override val shape = FlowShape.of(in, out) | |
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
setHandler(in, new InHandler { | |
override def onPush() = grab(in) match { | |
case "BYE" ⇒ | |
println("Connection closed") | |
completeStage() | |
case msg ⇒ | |
println(msg) | |
push(out, msg) | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull() = pull(in) | |
}) | |
} | |
} | |
val flow = Flow[ByteString] | |
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) | |
.map(_.utf8String) | |
.via(closeClient) | |
.map(_ ⇒ StdIn.readLine("> ")) | |
.map(_+"\n") | |
.map(ByteString(_)) | |
connection.join(flow).run() | |
} |
import scala.util._ | |
import akka._ | |
import akka.actor._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.util._ | |
import akka.stream.stage._ | |
object ComplexTcpServer extends App { | |
val closeConnection = new GraphStage[FlowShape[String, String]] { | |
val in = Inlet[String]("closeConnection.in") | |
val out = Outlet[String]("closeConnection.out") | |
override val shape = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
setHandler(in, new InHandler { | |
override def onPush() = grab(in) match { | |
case "q" ⇒ | |
push(out, "BYE") | |
completeStage() | |
case msg ⇒ | |
push(out, s"Server hereby responds to message: $msg\n") | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull() = pull(in) | |
}) | |
} | |
} | |
def serverLogic | |
(conn: Tcp.IncomingConnection) | |
(implicit system: ActorSystem) | |
: Flow[ByteString, ByteString, NotUsed] | |
= Flow.fromGraph(GraphDSL.create() { implicit b ⇒ | |
import GraphDSL.Implicits._ | |
val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n")) | |
val logic = b.add(Flow[ByteString] | |
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) | |
.map(_.utf8String) | |
.map { msg ⇒ system.log.debug(s"Server received: $msg"); msg } | |
.via(closeConnection) | |
.map(ByteString(_))) | |
val concat = b.add(Concat[ByteString]()) | |
welcome ~> concat.in(0) | |
logic.outlet ~> concat.in(1) | |
FlowShape(logic.in, concat.out) | |
}) | |
def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = { | |
import system.dispatcher | |
val connectionHandler = Sink.foreach[Tcp.IncomingConnection] { conn ⇒ | |
system.log.debug(s"Incoming connection from: ${conn.remoteAddress}") | |
conn.handleWith(serverLogic(conn)) | |
} | |
val incomingCnnections = Tcp().bind(address, port) | |
val binding = incomingCnnections.to(connectionHandler).run() | |
binding onComplete { | |
case Success(b) ⇒ | |
system.log.debug(s"Server started, listening on: ${b.localAddress}") | |
case Failure(e) ⇒ | |
system.log.debug(s"Server could not be bound to $address:$port: ${e.getMessage}") | |
} | |
} | |
def mkAkkaServer() = { | |
val address = "127.0.0.1" | |
val port = 6666 | |
implicit val server = ActorSystem("Server") | |
implicit val materializer = ActorMaterializer() | |
mkServer(address, port) | |
} | |
mkAkkaServer() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment