Skip to content

Instantly share code, notes, and snippets.

@debasishg
Created April 26, 2016 13:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save debasishg/a42e867bb2bc8ad18243597178bbce93 to your computer and use it in GitHub Desktop.
Save debasishg/a42e867bb2bc8ad18243597178bbce93 to your computer and use it in GitHub Desktop.
Sample stream processing
Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91).
Type in expressions to have them evaluated.
Type :help for more information.
scala> import frdomain.ch6.streams._
import frdomain.ch6.streams._
scala> import Main._
import Main._
scala> import akka.stream.scaladsl._
import akka.stream.scaladsl._
scala> import akka.stream._
import akka.stream._
scala> import akka.NotUsed
import akka.NotUsed
scala> txnSink
res0: akka.stream.scaladsl.Sink[frdomain.ch6.streams.Transaction,scala.concurrent.Future[frdomain.ch6.streams.Transaction]] =
Sink(SinkShape(fold.in), CompositeModule [4ef8207f]
Name: foldSink
Modules:
(fold) GraphStage(fold) [7f27be6c]
(unnamed) [2029a5a8] copy of CompositeModule [75dbc4a3]
Name: unnamed
Modules:
(headSink) GraphStage(HeadOptionStage) [0eb22a07]
Downstreams:
Upstreams:
MatValue: Transform(Atomic(headSink[0eb22a07]))
Downstreams:
fold.out -> headOption.in
Upstreams:
headOption.in -> fold.out
MatValue: Atomic(akka.stream.impl.StreamLayout$CopiedModule[2029a5a8]))
scala> val netTxn: RunnableGraph[NotUsed] = transactions.groupBy(maxSubstreams = 100, _.accountNo).to(txnSink)
netTxn: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] =
RunnableGraph(CompositeModule [2789487f]
Name: unnamed
Modules:
(futureSource) GraphStage(FutureSource) [3ff71948]
(unnamed) [1b5e508d] copy of GraphStage(StatefulMapConcat) [0bdbfc89]
(groupBy) GroupBy(100) [667c973d]
(foreachSink) [61c6efe3] copy of CompositeModule [0875016e]
Name: foreachSink
Modules:
(map) GraphStage(map) [5c1e8b5d]
(ignoreSink) SinkholeSink [5ed93a9c]
Downstreams:
map.out -> SinkholeSink.in
Upstreams:
SinkholeSink.in -> map.out
MatValue: Atomic(ignoreSink[5ed93a9c])
Downstreams:
future.out -> StatefulMapConcat.in
StatefulMapConcat.out -> Flow.in
Flow.out -> map.in
Upstreams:
StatefulMapConcat.in -> future.o...
scala> netTxn.run()
res1: akka.NotUsed = NotUsed
scala>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment