Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
click stream example
import java.awt.Dimension
import java.awt.event.MouseAdapter
import java.awt.event.MouseEvent
import javax.swing.JFrame
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
object ClickStreamExample extends App {
implicit class SourceEnriched[A, Mat](stream: Source[A, Mat]) {
/**
* Accumulates elements as long as they arrive within the time of `duration`
* after the previous element.
*/
def throttle(duration: FiniteDuration): Source[immutable.Seq[A], Mat] = {
require(duration > Duration.Zero)
stream.via(new Throttle[A](duration)).withAttributes(Attributes.name("throttle"))
}
}
def mkClickFrame(ref: ActorRef): Unit = {
new JFrame("Click Stream Example") {
setPreferredSize(new Dimension(300, 300))
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE)
addMouseListener(new MouseAdapter {
override def mouseClicked(e: MouseEvent) =
ref ! e
})
pack()
setVisible(true)
}
}
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
val clickStream = Source
.actorRef[MouseEvent](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(mkClickFrame)
val multiClickStream = clickStream
.throttle(250.millis)
.map(clickEvents => clickEvents.length)
.filter(numberOfClicks => numberOfClicks >= 2)
multiClickStream runForeach println
}
/**
* Implementation copied from [[akka.stream.impl.fusing.GroupedWithin]] and
* adapted to our needs.
*/
final class Throttle[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, immutable.Seq[A]]] {
val in = Inlet[A]("in")
val out = Outlet[immutable.Seq[A]]("out")
val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
val buf: immutable.VectorBuilder[A] = new immutable.VectorBuilder
var groupClosed = false
var finished = false
val GroupedWithinTimer = "GroupedWithinTimer"
override def preStart() = {
pull(in)
}
def nextElement(elem: A): Unit = {
buf += elem
scheduleOnce(GroupedWithinTimer, duration)
pull(in)
}
def closeGroup(): Unit = {
groupClosed = true
if (isAvailable(out)) emitGroup()
}
def emitGroup(): Unit = {
push(out, buf.result())
buf.clear()
if (!finished) startNewGroup()
else completeStage()
}
def startNewGroup(): Unit = {
groupClosed = false
if (isAvailable(in)) nextElement(grab(in))
else if (!hasBeenPulled(in)) pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit =
if (!groupClosed)
nextElement(grab(in)) // otherwise keep the element for next round
override def onUpstreamFinish(): Unit = {
finished = true
if (!groupClosed) closeGroup()
else completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
setHandler(out, new OutHandler {
override def onPull(): Unit = if (groupClosed) emitGroup()
override def onDownstreamFinish(): Unit = completeStage()
})
override protected def onTimer(timerKey: Any) =
closeGroup()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment