Skip to content

Instantly share code, notes, and snippets.

@ivportilla
Forked from kiritsuku/ClickStreamExample.scala
Created August 14, 2017 04:03
Show Gist options
  • Save ivportilla/f0ab7c418b3f0f0df7749e45c945201e to your computer and use it in GitHub Desktop.
Save ivportilla/f0ab7c418b3f0f0df7749e45c945201e to your computer and use it in GitHub Desktop.
click stream example
import java.awt.Dimension
import java.awt.event.MouseAdapter
import java.awt.event.MouseEvent
import javax.swing.JFrame
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
object ClickStreamExample extends App {
implicit class SourceEnriched[A, Mat](stream: Source[A, Mat]) {
/**
* Accumulates elements as long as they arrive within the time of `duration`
* after the previous element.
*/
def throttle(duration: FiniteDuration): Source[immutable.Seq[A], Mat] = {
require(duration > Duration.Zero)
stream.via(new Throttle[A](duration)).withAttributes(Attributes.name("throttle"))
}
}
def mkClickFrame(ref: ActorRef): Unit = {
new JFrame("Click Stream Example") {
setPreferredSize(new Dimension(300, 300))
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE)
addMouseListener(new MouseAdapter {
override def mouseClicked(e: MouseEvent) =
ref ! e
})
pack()
setVisible(true)
}
}
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
val clickStream = Source
.actorRef[MouseEvent](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(mkClickFrame)
val multiClickStream = clickStream
.throttle(250.millis)
.map(clickEvents => clickEvents.length)
.filter(numberOfClicks => numberOfClicks >= 2)
multiClickStream runForeach println
}
/**
* Implementation copied from [[akka.stream.impl.fusing.GroupedWithin]] and
* adapted to our needs.
*/
final class Throttle[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, immutable.Seq[A]]] {
val in = Inlet[A]("in")
val out = Outlet[immutable.Seq[A]]("out")
val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
val buf: immutable.VectorBuilder[A] = new immutable.VectorBuilder
var groupClosed = false
var finished = false
val GroupedWithinTimer = "GroupedWithinTimer"
override def preStart() = {
pull(in)
}
def nextElement(elem: A): Unit = {
buf += elem
scheduleOnce(GroupedWithinTimer, duration)
pull(in)
}
def closeGroup(): Unit = {
groupClosed = true
if (isAvailable(out)) emitGroup()
}
def emitGroup(): Unit = {
push(out, buf.result())
buf.clear()
if (!finished) startNewGroup()
else completeStage()
}
def startNewGroup(): Unit = {
groupClosed = false
if (isAvailable(in)) nextElement(grab(in))
else if (!hasBeenPulled(in)) pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit =
if (!groupClosed)
nextElement(grab(in)) // otherwise keep the element for next round
override def onUpstreamFinish(): Unit = {
finished = true
if (!groupClosed) closeGroup()
else completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
setHandler(out, new OutHandler {
override def onPull(): Unit = if (groupClosed) emitGroup()
override def onDownstreamFinish(): Unit = completeStage()
})
override protected def onTimer(timerKey: Any) =
closeGroup()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment