Skip to content

Instantly share code, notes, and snippets.

@petitviolet
Last active April 11, 2021 21:07
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save petitviolet/7de478c406b568b738471dd324ad80e5 to your computer and use it in GitHub Desktop.
Save petitviolet/7de478c406b568b738471dd324ad80e5 to your computer and use it in GitHub Desktop.
Akka-Stream with Actor using ActorPublisher and ActorSubscriber
package net.petitviolet.ex.persistence.task
import akka.NotUsed
import akka.actor._
import akka.pattern.ask
import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnNext }
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, OneByOneRequestStrategy, RequestStrategy }
import akka.stream.{ ActorMaterializer, ClosedShape }
import akka.util.Timeout
import org.reactivestreams.Publisher
import scala.concurrent.Future
import scala.io.StdIn
private case class Letter(value: String) extends AnyVal
private case object Finish
private object AkkaStreamPracWithActor extends App {
import akka.stream.scaladsl._
implicit val system = ActorSystem("akka-stream-prac")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
class PublishActor extends ActorPublisher[Letter] {
// publish [[Letter]] or OnComplete
override def receive: Actor.Receive = {
case s: String =>
onNext(Letter(s"Nice: $s"))
case i: Int =>
onNext(Letter(s"Great: ${i * 100}"))
case Finish =>
onComplete()
}
}
class FlowActor extends Actor {
// subscribe and publish
override def receive: Actor.Receive = {
case Letter(msg) => sender() ! Letter(s"(Mapped: $msg)")
case any => println(s"??? => $any")
}
}
class SubscribeActor extends ActorSubscriber {
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
// just subscribe
override def receive: Actor.Receive = {
case OnNext(any) => println(s"subscribed: $any")
case OnComplete => println(s"finish process!")
}
}
// publisher actor
val actorRef = system.actorOf(Props[PublishActor])
// source with actor
val source: Source[Letter, NotUsed] = {
val publisher: Publisher[Letter] = ActorPublisher(actorRef)
Source.fromPublisher(publisher)
}
// flow
val flow: Flow[Letter, Letter, NotUsed] = {
import scala.concurrent.duration._
implicit val timeout: Timeout = 1.second
val flowActor = system.actorOf(Props[FlowActor])
def flowWithActor(reply: Letter): Future[Letter] = (flowActor ? reply).mapTo[Letter]
Flow[Letter].mapAsync[Letter](3)(flowWithActor)
}
// simple implementation without actor
val _flow: Flow[Letter, Letter, NotUsed] = Flow[Letter].map { r => r.copy(value = s"(Mapped: ${r.value})") }
// another flow without actor
val accumulater: Flow[Letter, String, NotUsed] =
Flow[Letter].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
// sink with actor
val sink: Sink[String, NotUsed] = {
val printActor = system.actorOf(Props[SubscribeActor])
Sink.fromSubscriber[String](ActorSubscriber[String](printActor))
}
// simple graph
val _graph: RunnableGraph[NotUsed] =
RunnableGraph.fromGraph(source via flow via accumulater to sink)
// written by DSL
val graph: RunnableGraph[NotUsed] = RunnableGraph.fromGraph {
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
source ~> flow ~> accumulater ~> sink
ClosedShape
}
}
graph.run
// wait preparing graph
Thread.sleep(100L)
actorRef ! "hello!"
actorRef ! 100
actorRef ! "good"
actorRef ! Finish
println("push Enter to shutdown process.")
StdIn.readLine()
system.terminate()
}
package net.petitviolet.ex.persistence.task
import akka.actor._
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import akka.{ Done, NotUsed }
import scala.concurrent.Future
import scala.io.StdIn
import scala.language.postfixOps
private case class Message(value: String) extends AnyVal
private object AkkaStreamStandard extends App {
import akka.stream.scaladsl._
implicit val system = ActorSystem("akka-stream-prac")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
// source with actor
val source: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](100, OverflowStrategy.backpressure)
// flow
val flow: Flow[Message, Message, NotUsed] =
Flow[Message].map { r => r.copy(value = s"(Mapped: ${r.value})") }
// another flow
val accumulater: Flow[Message, String, NotUsed] =
Flow[Message].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
// sink just printing message
val sink: Sink[String, Future[Done]] = Sink.foreach[String] { println }
// simple graph
val graph: RunnableGraph[SourceQueueWithComplete[Message]] =
source via flow via accumulater to sink
// queue for publisher of graph
val queue: SourceQueueWithComplete[Message] = graph.run()
// wait preparing graph
Thread.sleep(100L)
queue offer Message("hello!")
queue offer Message("100")
queue offer Message("good")
queue complete
println("push Enter to shutdown process.")
StdIn.readLine()
system.terminate()
}
@farukonder
Copy link

Great Example
thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment