Skip to content

Instantly share code, notes, and snippets.

@johanandren
Created December 3, 2015 15:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johanandren/ab3f4982802231e7c60c to your computer and use it in GitHub Desktop.
Save johanandren/ab3f4982802231e7c60c to your computer and use it in GitHub Desktop.
Naive intersperse implementation
import akka.stream.FlowShape
import akka.stream.scaladsl.FlowGraph
import akka.stream.scaladsl.{Source, Concat, Flow}
import akka.stream.stage.{SyncDirective, Context, PushPullStage}
object Intersperse {
def apply[E](separator: E): Flow[E, E, Unit] =
Flow[E].transform(() => new Intersperse[E](separator))
def apply[A](pre: A, separator: A, post: A): Flow[A, A, Unit] =
Flow.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val preS = b.add(Source.single[A](pre))
val inter = b.add(Intersperse[A](separator))
val postS = b.add(Source.single[A](post))
val concat = b.add(Concat[A](3))
preS ~> concat.in(0)
inter ~> concat.in(1)
postS ~> concat.in(2)
FlowShape.of(inter.inlet, concat.out)
})
}
class Intersperse[A](separator: A) extends PushPullStage[A, A] {
var seenFirst = false
var queued: Option[A] = None
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (!seenFirst) {
seenFirst = true
ctx.push(elem)
} else {
queued = Some(elem)
ctx.push(separator)
}
override def onPull(ctx: Context[A]): SyncDirective =
queued.fold[SyncDirective](
ctx.pull()
){ elem =>
queued = None
ctx.push(elem)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment