Skip to content

Instantly share code, notes, and snippets.

@bonbon1702
Last active December 8, 2017 03:00
Show Gist options
  • Save bonbon1702/1b4ea7858ee7655599d79eada64c01dc to your computer and use it in GitHub Desktop.
Save bonbon1702/1b4ea7858ee7655599d79eada64c01dc to your computer and use it in GitHub Desktop.
case class Account(id: Int, name: String)
case class Campaign(id: Int, accountId: Int, name: String)
case class Keyword(id: Int, campaignId: Int, name: String)
val accounts: List[Account] = List(
Account(1, "Maria"),
Account(2, "Jon"),
Account(3, "Snow")
)
val campaigns: List[Campaign] = List(
Campaign(1, 1, "Maria-campaign-1"),
Campaign(2, 1, "Maria-campaign-2"),
Campaign(3, 2, "Jon-campaign-1"),
Campaign(4, 3, "Snow-campaign-1"),
)
val keywords: List[Keyword] = List(
Keyword(1, 1, "A"),
Keyword(2, 1, "B"),
Keyword(3, 1, "C"),
Keyword(4, 1, "D"),
Keyword(5, 2, "E"),
Keyword(6, 2, "F"),
Keyword(7, 3, "G"),
Keyword(8, 3, "H"),
Keyword(9, 4, "I"),
)
implicit val system: ActorSystem = ActorSystem("AkkaStream")
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system))
// Source is where everything start
// It has a output
val source: Source[Account, NotUsed] = Source(accounts)
// Flow is where data transformation
// It has a input and a output
val accountFlow: Flow[Account, List[Campaign], NotUsed] = Flow[Account].map(account => {
println(s"Insert account ${account.name}")
val campaignIn = for (campaign <- campaigns; if campaign.accountId == account.id) yield campaign
campaignIn
})
// Flow[A,B,C] : A is input's type, B is output's type, C is materialized value
val campaignFlow: Flow[Campaign, List[Keyword], NotUsed] = Flow[Campaign].map(campaign => {
println(s"Insert campaign ${campaign.name}")
val keywordsIn = for (keyword <- keywords; if keyword.campaignId == campaign.id) yield keyword
keywordsIn
})
val keywordFlow: Flow[Keyword, Unit, NotUsed] = Flow[Keyword].map(keyword => {
println(s"Insert keyword ${keyword.name}")
})
// This is where you connect all the pieces you have wrote
// via means make ...~>...
source via accountFlow mapConcat identity via campaignFlow mapConcat identity via keywordFlow runWith Sink.ignore onComplete {
case Success(_) =>
println("Stream completed successfully")
system.terminate()
case Failure(error) =>
println(s"Stream failed with error ${error.getMessage}")
system.terminate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment