Skip to content

Instantly share code, notes, and snippets.

@NicolaeNMV
Created February 6, 2017 07:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NicolaeNMV/0f556315ae3e00d9031d04b4efd694fc to your computer and use it in GitHub Desktop.
Save NicolaeNMV/0f556315ae3e00d9031d04b4efd694fc to your computer and use it in GitHub Desktop.
Playing with reactive streams and ammonite
#!/usr/bin/env amm
import $ivy.`com.typesafe.akka::akka-stream:2.4.16`
import java.nio.file.Paths
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
final case class Author(handle: String)
final case class Hashtag(name: String)
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
val tweet = Tweet(Author("auth"), timestamp = 123, body = "#blah #lol")
val tweets = Source.repeat(tweet).take(10)
val sinkPrintingOutElements = Sink.foreach[String](println(_))
val writeAuthors: Sink[Author, NotUsed] =
Flow[Author].map(_.handle).toMat(sinkPrintingOutElements)(Keep.left)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Tweet](2))
tweets ~> bcast.in
bcast.out(0) ~> Flow[Tweet].map(_.author.handle.toString) ~> sinkPrintingOutElements
bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList).map(_.toString) ~> sinkPrintingOutElements
ClosedShape
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment