Skip to content

Instantly share code, notes, and snippets.

@debasishg
Last active April 26, 2016 15:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save debasishg/4d596c1f26d4759ed65e281bb2e6fd2c to your computer and use it in GitHub Desktop.
Save debasishg/4d596c1f26d4759ed65e281bb2e6fd2c to your computer and use it in GitHub Desktop.
sample streams application with akka-streams 1.x ..
scala> import frdomain.ch6.streams._
import frdomain.ch6.streams._
scala> import Main._
import Main._
scala> import akka.stream.scaladsl._
import akka.stream.scaladsl._
scala> import akka.stream._
import akka.stream._
scala> import common._
import common._
scala> import OnlineService._
import OnlineService._
scala> import Transaction._
import Transaction._
scala> transactions
res0: akka.stream.scaladsl.Source[frdomain.ch6.streams.Transaction,Unit] = akka.stream.scaladsl.Source@207e858e
scala> txnSink
res1: akka.stream.scaladsl.Sink[frdomain.ch6.streams.Transaction,scala.concurrent.Future[frdomain.ch6.streams.Transaction]] = akka.stream.scaladsl.Sink@5fae46df
scala> import scala.concurrent.Future
import scala.concurrent.Future
scala> val netTxn: Source[RunnableGraph[Future[Transaction]], Unit] =
| transactions.map(validate).groupBy(_.accountNo).map { case (a, s) => s.toMat(txnSink)(Keep.right) }
netTxn: akka.stream.scaladsl.Source[akka.stream.scaladsl.RunnableGraph[scala.concurrent.Future[frdomain.ch6.streams.Transaction]],Unit] = akka.stream.scaladsl.Source@2e72ed0b
scala> netTxn.map(_.run()).runForeach(_.foreach(println))
res2: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@f50422c
scala> Transaction(800259541,a-3,Credit,0,Tue Apr 26 20:59:43 IST 2016)
Transaction(2059049155,a-1,Credit,0,Tue Apr 26 20:59:43 IST 2016)
Transaction(2023562889,a-2,Debit,2000,Tue Apr 26 20:59:43 IST 2016)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment