Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@WadeWaldron
Last active July 28, 2018 13:15
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save WadeWaldron/abfd6ef26277355a65b9 to your computer and use it in GitHub Desktop.
Save WadeWaldron/abfd6ef26277355a65b9 to your computer and use it in GitHub Desktop.
Akka Streams Customer Example
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.collection.immutable
import scala.util.Random
object InputCustomer {
def random():InputCustomer = {
InputCustomer(s"FirstName${Random.nextInt(1000)} LastName${Random.nextInt(1000)}")
}
}
case class InputCustomer(name: String)
case class OutputCustomer(firstName: String, lastName: String)
object CustomersExample extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val flowMaterializer = ActorFlowMaterializer()
val inputCustomers = Source((1 to 100).map(_ => InputCustomer.random()))
val normalize = Flow[InputCustomer].map(c => c.name.split(" ").toList).collect {
case firstName::lastName::Nil => OutputCustomer(firstName, lastName)
}
val writeCustomers = Sink.foreach[OutputCustomer] { customer =>
println(customer)
}
inputCustomers.via(normalize).runWith(writeCustomers).andThen {
case _ =>
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment