Skip to content

Instantly share code, notes, and snippets.

@limansky
Created March 11, 2021 08:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save limansky/3ec6c92912b0c156d6b43f8b939deeab to your computer and use it in GitHub Desktop.
Save limansky/3ec6c92912b0c156d6b43f8b939deeab to your computer and use it in GitHub Desktop.
Akka issue demo
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Flow, Framing, Source, Tcp }
import akka.util.ByteString
import scala.concurrent.{ ExecutionContext, Future }
object Test {
def logic(s: String)(implicit ec: ExecutionContext): Future[String] = {
Future {
val r = s.reverse
if (s == r) throw new IllegalArgumentException("S = R!")
r
}
}
def main(args: Array[String]): Unit = {
println("Let's go")
implicit val system: ActorSystem = ActorSystem("Test-me")
import system.dispatcher
val serverConn = Tcp().bind("localhost", 11111)
serverConn runForeach { conn =>
val handler = Flow[ByteString]
.via(Framing.delimiter(ByteString("\r\n"), 128))
.map(_.utf8String)
.mapAsync(1)(logic)
.map(s => Seq(s + "\n", "-----------------\n"))
.flatMapConcat { rs =>
val it = rs.map { resp =>
ByteString(resp)
}
Source(it.toList)
}
.recover { case x => ByteString(s"ERROR: ${x.getMessage}\n") }
conn.handleWith(handler)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment