Created
December 15, 2014 19:42
-
-
Save gertjana/398350eb91f4cd999d1f to your computer and use it in GitHub Desktop.
Akka Stream based on an Actor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.thenewmotion.streams | |
import akka.stream.actor.ActorPublisher | |
import akka.stream.scaladsl._ | |
import akka.actor.{Props, ActorSystem} | |
import akka.stream.FlowMaterializer | |
import scala.language.postfixOps | |
case class Tick() | |
class TickActor extends ActorPublisher[Int] { | |
import scala.concurrent.duration._ | |
implicit val ec = context.dispatcher | |
val tick = context.system.scheduler.schedule(1 second, 1 second, self, Tick()) | |
var cnt = 0 | |
var buffer = Vector.empty[Int] | |
override def receive: Receive = { | |
case Tick() => { | |
cnt = cnt + 1 | |
if (buffer.isEmpty && totalDemand > 0) { | |
onNext(cnt) | |
} | |
else { | |
buffer :+= cnt | |
if (totalDemand > 0) { | |
val (use,keep) = buffer.splitAt(totalDemand.toInt) | |
buffer = keep | |
use foreach onNext | |
} | |
} | |
} | |
} | |
override def postStop() = tick.cancel() | |
} | |
object Main extends App { | |
implicit val system = ActorSystem("Sys") | |
implicit val ec = system.dispatcher | |
val tickActor = system.actorOf(Props[TickActor]) | |
implicit val materializer = FlowMaterializer() | |
val source:Source[Int] = Source(ActorPublisher[Int](tickActor)) | |
val sink:Sink[Int] = ForeachSink[Int](x => { | |
println(s"sink received $x") | |
// Thread.sleep(1500) | |
}) | |
source | |
//.filter(_ % 2 == 0) | |
.to(sink) | |
.run() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Source(ActorPublisherInt): says expected iterable , actual publisher for akka streams version 2.0.1 . Instead we can use this: Source.actorPublisher(x : Props[T])
But i need to use actorRef instead of Props, since i need to forward messages via this actor to the stream. Any suggestions for this ?