Skip to content

Instantly share code, notes, and snippets.

@searler
Created July 18, 2015 20:58
Show Gist options
  • Save searler/bbf8ea7b8d9e486c3a9d to your computer and use it in GitHub Desktop.
Save searler/bbf8ea7b8d9e486c3a9d to your computer and use it in GitHub Desktop.
Simple request/response TCP client using AKKA streams and FSM
package io
import scala.concurrent.duration.DurationInt
import scala.util.Success
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.FSM
import akka.actor.PoisonPill
import akka.actor.Props
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Tcp
import akka.util.ByteString
import akka.util.Timeout
/**
* TCP client that connects to 127.0.0.1:9999, using a request/response pattern.
*
*/
object FSMClienter extends App {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val client = system.actorOf(Props[ClientActor])
implicit val timeout = Timeout(15 seconds)
while (true) {
val r = (client ? (scala.io.StdIn.readLine() + "\n"))
r.onComplete {_ match {
case Success(v:ByteString) => println(v.utf8String)
case _ @ x => println(x)
} }
}
sealed trait State
case object Disconnected extends State
case object Connected extends State
case object AwaitResult extends State
sealed trait Data
case object Empty extends Data
case class Connection(sink: ActorRef) extends Data
case class Transaction(connection: Connection, requestor: ActorRef) extends Data
class ClientActor extends FSM[State, Data] {
startWith(Disconnected, Empty)
when(Disconnected) {
case Event(s: String, Empty) => {
val c = makeConnection
c.sink ! ByteString(s)
goto(AwaitResult) using Transaction(c, sender())
}
}
when(Connected) {
case Event(s: String, c: Connection) => {
c.sink ! ByteString(s)
goto(AwaitResult) using Transaction(c, sender())
}
}
when(AwaitResult) {
case Event(bytes: ByteString, Transaction(connection, requestor)) =>
requestor ! bytes
goto(Connected) using connection
case Event(s: String, Transaction(connection, requestor)) =>
connection.sink ! ByteString(s)
stay using Transaction(connection, sender())
}
whenUnhandled {
case Event(_, c: Connection) =>
c.sink ! PoisonPill
goto(Disconnected) using Empty
case Event(_, Transaction(c, _)) =>
c.sink ! PoisonPill
goto(Disconnected) using Empty
}
private def makeConnection = {
case object Done
val connection = Tcp().outgoingConnection("127.0.0.1", 9999)
val src = Source.actorRef[ByteString](10, OverflowStrategy.fail)
val sink = Sink.actorRef(self, Done)
Connection(src.via(connection).to(sink).run)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment