Skip to content

Instantly share code, notes, and snippets.

@qstyler
Last active August 6, 2020 13:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qstyler/f472be578b1fb4e12707a505ba65ba26 to your computer and use it in GitHub Desktop.
Save qstyler/f472be578b1fb4e12707a505ba65ba26 to your computer and use it in GitHub Desktop.
name := "L-Arrivee-d-un-train"
version := "1.0"
scalaVersion := "2.13.1"
lazy val akkaVersion = "2.6.8"
val akkaHttpVersion = "10.1.12"
scalacOptions ++= Seq(
"-language:postfixOps",
)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"de.heikoseeberger" %% "akka-http-circe" % "1.33.0",
"io.circe" %% "circe-optics" % "0.13.0",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
)
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl.{ Flow, GraphDSL, Merge, RunnableGraph, Sink, Source }
import akka.stream.{ SourceShape, UniformFanInShape }
import akka.{ Done, NotUsed }
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.Json
import io.circe.optics.JsonPath._
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContextExecutor, Future }
case class Car(
persons: Int,
number: Int,
)
case class Person(
name: Option[String] = None,
car: Car
)
object LArriveedUnTrain extends App {
implicit val system: ActorSystem = ActorSystem("actor-system")
implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
val numberOfCars = 5
val rand = new java.util.Random
val cars = 1 to numberOfCars map { number ⇒
Car(
persons = rand.nextInt(4) + 2,
number = number,
)
}
val train: Source[Person, NotUsed] = Source.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val mergePersons: UniformFanInShape[Person, Person] = builder.add(Merge[Person](cars.size))
for (car ← cars) {
println(s"Subway car №${ car.number } arrived " +
s"with ${ car.persons } persons in it")
Source
.tick(1 second, 1 second, Person(car = car))
.take(car.persons) ~> mergePersons
}
SourceShape(mergePersons.out)
})
val pullOutId = Flow[Person].mapAsyncUnordered(5) { person ⇒
Http()
.singleRequest(HttpRequest(uri = "https://randomuser.me/api/"))
.flatMap(Unmarshal(_).to[Json])
.map({ response: Json ⇒
val name = root.results(0).name
val nameOpt = for {
title ← name.title.string.getOption(response)
first ← name.first.string.getOption(response)
last ← name.last.string.getOption(response)
} yield s"$title $first $last"
person.copy(name = nameOpt)
})
.recover(e ⇒ {
println(e.getMessage)
person
})
}
val street: Sink[Person, Future[Done]] =
Sink.foreach[Person] { person ⇒
println(
s"${ person.name.getOrElse("Someone") } " +
s"who came in a car №${ person.car.number } " +
s"has left the subway"
)
}
val stream: RunnableGraph[NotUsed] =
train via pullOutId to street
stream run
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment