Skip to content

Instantly share code, notes, and snippets.

@pauca
Created June 9, 2017 15:34
Show Gist options
  • Save pauca/6f8a382223d47f1c47c836c512c0b023 to your computer and use it in GitHub Desktop.
Save pauca/6f8a382223d47f1c47c836c512c0b023 to your computer and use it in GitHub Desktop.
Basic Akka Stream Template: read, transform, write
/*
build.sbt
name := "AkkaStreamTemplate"
version := "0.1-SNAPSHOT"
scalaVersion := "2.12.2"
libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-stream" % "2.5.1",
"com.github.scopt" %% "scopt" % "3.5.0"
)
}
resolvers += Resolver.sonatypeRepo("public")
*/
import akka.actor._
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.actor.ActorSubscriberMessage._
import akka.stream.actor._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import scala.io.Source
import java.io._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
case class Config(ifile: String="" , ofile :String="" )
class WriterSubscriber( ofile :String) extends ActorSubscriber with ActorLogging {
val requestStrategy = WatermarkRequestStrategy(500)
val bw = new PrintWriter( new FileOutputStream( new File(ofile )))
def save( s : String ) = {
bw.write(s)
}
def receive = {
case OnNext(s : String) => {
bw.write(s)
}
case OnError(err: Exception) =>
log.error(err, "Receieved Exception")
bw.close
context.stop(self)
context.system.terminate()
System.exit(1)
case OnComplete =>
log.info("Stream Completed!")
bw.close
context.stop(self)
context.system.terminate()
println(s"Output@\n${ofile}\n")
case _ =>{
log.error("Unexpected Stream !")
bw.close
context.stop(self)
context.system.terminate()
System.exit(1)
}
}
}
object AkkaStreamTemplate extends App {
val parser = new scopt.OptionParser[Config]("") {
head(""" """ , " ")
opt[String]( "ifile") required() action { (x, c) => c.copy(ifile = x)}
opt[String]( "ofile") required() action { (x, c) => c.copy(ofile = x)}
}
parser.parse(args, Config()) match {
case Some(config) =>{
try {
implicit val system = ActorSystem("AkkaStreamTemplate")
implicit val materializer = ActorMaterializer()
val props = new scala.sys.SystemProperties
val nrThreads: Int = props.get("scala.concurrent.context.maxThreads") match {
case Some(v) => v.toInt
case None => Runtime.getRuntime.availableProcessors
}
println("nrOfThreads: " + nrThreads)
// val lines = Source.fromFile(config.ifile).getLines
val lines = List("a","b","c")
val source = akka.stream.scaladsl.Source.fromIterator(() => lines.toIterator )
val step1 = source.mapAsync(Math.max( 1,nrThreads))( line => Future{
// do something
line
})
step1.runWith(Sink.actorSubscriber(Props(new WriterSubscriber(config.ofile))))
} catch {
case ex: IOException => println("Had an Exception: " +ex)
}
}
case None =>
println("Arguments are bad.")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment