Skip to content

Instantly share code, notes, and snippets.

@gertjana
Created December 15, 2014 19:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gertjana/398350eb91f4cd999d1f to your computer and use it in GitHub Desktop.
Save gertjana/398350eb91f4cd999d1f to your computer and use it in GitHub Desktop.
Akka Stream based on an Actor
package com.thenewmotion.streams
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl._
import akka.actor.{Props, ActorSystem}
import akka.stream.FlowMaterializer
import scala.language.postfixOps
case class Tick()
class TickActor extends ActorPublisher[Int] {
import scala.concurrent.duration._
implicit val ec = context.dispatcher
val tick = context.system.scheduler.schedule(1 second, 1 second, self, Tick())
var cnt = 0
var buffer = Vector.empty[Int]
override def receive: Receive = {
case Tick() => {
cnt = cnt + 1
if (buffer.isEmpty && totalDemand > 0) {
onNext(cnt)
}
else {
buffer :+= cnt
if (totalDemand > 0) {
val (use,keep) = buffer.splitAt(totalDemand.toInt)
buffer = keep
use foreach onNext
}
}
}
}
override def postStop() = tick.cancel()
}
object Main extends App {
implicit val system = ActorSystem("Sys")
implicit val ec = system.dispatcher
val tickActor = system.actorOf(Props[TickActor])
implicit val materializer = FlowMaterializer()
val source:Source[Int] = Source(ActorPublisher[Int](tickActor))
val sink:Sink[Int] = ForeachSink[Int](x => {
println(s"sink received $x")
// Thread.sleep(1500)
})
source
//.filter(_ % 2 == 0)
.to(sink)
.run()
}
@concerned3rdparty
Copy link

Source(ActorPublisherInt): says expected iterable , actual publisher for akka streams version 2.0.1 . Instead we can use this: Source.actorPublisher(x : Props[T])
But i need to use actorRef instead of Props, since i need to forward messages via this actor to the stream. Any suggestions for this ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment