Skip to content

Instantly share code, notes, and snippets.

@vasily-kirichenko
Created February 24, 2018 08:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vasily-kirichenko/25644e1fe2b70d1e6ae8adb828c6d677 to your computer and use it in GitHub Desktop.
Save vasily-kirichenko/25644e1fe2b70d1e6ae8adb828c6d677 to your computer and use it in GitHub Desktop.

Constracts

case class OfferSource(ref: SourceRef[Int])
case object RequestSource

Producer

akka {
  actor {
    provider = "cluster"
    debug {
      lifecycle = on
      unhandled = on
    }
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  loglevel = "DEBUG"
  cluster {
    seed-nodes = [
      "akka.tcp://test@127.0.0.1:2551",
      "akka.tcp://test@127.0.0.1:2552"]
  }
}
object Producer extends App {
  implicit val system = ActorSystem("test", ConfigFactory.load())

  class Producer extends Actor with ActorLogging {
    override def receive = {
      case RequestSource =>
        val sourceRef =
          Source(1 to 1000)
            .map(x => {
              log.info(s"Giving $x...")
              x
            })
            .runWith(StreamRefs.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(1 minute)))
        sourceRef.map(OfferSource) pipeTo sender()
    }
  }

  system.actorOf(Props[Producer], "producer")
}

Consumer

akka {
  actor {
    provider = "cluster"
    debug {
      lifecycle = on
      unhandled = on
    }
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
  }
  loglevel = "DEBUG"
  cluster {
    seed-nodes = [
      "akka.tcp://test@127.0.0.1:2551",
      "akka.tcp://test@127.0.0.1:2552"]
  }
}
object Consumer extends App {
  implicit val system = ActorSystem("test", ConfigFactory.load())

  class Consumer extends Actor with ActorLogging {
    case object GetSource

    override def receive = {
      case GetSource =>
        log.info("Requesting source...")
        val producer = context.actorSelection("akka.tcp://test@127.0.0.1:2551/user/producer")
        producer ! RequestSource

      case OfferSource(sourceRef) =>
        log.info("Got source!")
        sourceRef
          .throttle(1, 300 millis, 0, ThrottleMode.Shaping)
          .runForeach(println)
    }

    override def preStart() = self ! GetSource
  }

  system.actorOf(Props[Consumer], "consumer")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment