click stream example
import java.awt.Dimension | |
import java.awt.event.MouseAdapter | |
import java.awt.event.MouseEvent | |
import javax.swing.JFrame | |
import scala.collection.immutable | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage._ | |
object ClickStreamExample extends App { | |
implicit class SourceEnriched[A, Mat](stream: Source[A, Mat]) { | |
/** | |
* Accumulates elements as long as they arrive within the time of `duration` | |
* after the previous element. | |
*/ | |
def throttle(duration: FiniteDuration): Source[immutable.Seq[A], Mat] = { | |
require(duration > Duration.Zero) | |
stream.via(new Throttle[A](duration)).withAttributes(Attributes.name("throttle")) | |
} | |
} | |
def mkClickFrame(ref: ActorRef): Unit = { | |
new JFrame("Click Stream Example") { | |
setPreferredSize(new Dimension(300, 300)) | |
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE) | |
addMouseListener(new MouseAdapter { | |
override def mouseClicked(e: MouseEvent) = | |
ref ! e | |
}) | |
pack() | |
setVisible(true) | |
} | |
} | |
implicit val system = ActorSystem("TestSystem") | |
implicit val materializer = ActorMaterializer() | |
val clickStream = Source | |
.actorRef[MouseEvent](bufferSize = 0, OverflowStrategy.fail) | |
.mapMaterializedValue(mkClickFrame) | |
val multiClickStream = clickStream | |
.throttle(250.millis) | |
.map(clickEvents => clickEvents.length) | |
.filter(numberOfClicks => numberOfClicks >= 2) | |
multiClickStream runForeach println | |
} | |
/** | |
* Implementation copied from [[akka.stream.impl.fusing.GroupedWithin]] and | |
* adapted to our needs. | |
*/ | |
final class Throttle[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, immutable.Seq[A]]] { | |
val in = Inlet[A]("in") | |
val out = Outlet[immutable.Seq[A]]("out") | |
val shape = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { | |
val buf: immutable.VectorBuilder[A] = new immutable.VectorBuilder | |
var groupClosed = false | |
var finished = false | |
val GroupedWithinTimer = "GroupedWithinTimer" | |
override def preStart() = { | |
pull(in) | |
} | |
def nextElement(elem: A): Unit = { | |
buf += elem | |
scheduleOnce(GroupedWithinTimer, duration) | |
pull(in) | |
} | |
def closeGroup(): Unit = { | |
groupClosed = true | |
if (isAvailable(out)) emitGroup() | |
} | |
def emitGroup(): Unit = { | |
push(out, buf.result()) | |
buf.clear() | |
if (!finished) startNewGroup() | |
else completeStage() | |
} | |
def startNewGroup(): Unit = { | |
groupClosed = false | |
if (isAvailable(in)) nextElement(grab(in)) | |
else if (!hasBeenPulled(in)) pull(in) | |
} | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = | |
if (!groupClosed) | |
nextElement(grab(in)) // otherwise keep the element for next round | |
override def onUpstreamFinish(): Unit = { | |
finished = true | |
if (!groupClosed) closeGroup() | |
else completeStage() | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = if (groupClosed) emitGroup() | |
override def onDownstreamFinish(): Unit = completeStage() | |
}) | |
override protected def onTimer(timerKey: Any) = | |
closeGroup() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment