Skip to content

Instantly share code, notes, and snippets.

@oscar-martin
Last active June 5, 2019 15:41
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oscar-martin/84fc56fa96152388f29639a20cff0272 to your computer and use it in GitHub Desktop.
Save oscar-martin/84fc56fa96152388f29639a20cff0272 to your computer and use it in GitHub Desktop.
Simple application that shows how to push content into a source externally and also how to work with the materialized views in order to control the source.
import java.util.concurrent.atomic.AtomicBoolean
import akka.Done
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.Future
import scala.sys.process._
import scala.util.{Failure, Success}
import scala.concurrent.blocking
object EventSource extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
implicit val log: LoggingAdapter = Logging(system, getClass)
val DefaultMaxQueueSize = 100
// creates a Source where you also get the Process back so the process can be managed later on programmatically
def createProcessStdOutAsSource(cmd: Seq[String]): Source[String, Option[Process]] = {
// this could have been done by using ProcessBuilder.lineStream method but the purpose of this gist is to
// show how content can be added externally to the Source.
val (queue, pub) = Source.queue[String](DefaultMaxQueueSize, OverflowStrategy.backpressure).toMat(Sink.asPublisher(false))(Keep.both).run()
def processEvent(eventString: String): Unit = {
queue.offer(eventString)
}
val processLogger = ProcessLogger(processEvent, errorMessage => log.error(s"Error listening from subprocess: $errorMessage"))
val process = cmd run processLogger
val processDone = new AtomicBoolean(false)
Future {
blocking {
process.exitValue()
}
}.onComplete(_ => processDone.compareAndSet(false, true))
Source.fromPublisher(pub).mapMaterializedValue(_ => Option(process)).takeWhile((String) => !processDone.get())
}
def eventSink(): Sink[String, Future[Done]] = {
Flow[String].map { event => println(s"[STDOUT]: $event")}.toMat(Sink.ignore)(Keep.right)
}
println("")
println("Creates an 'unbound' command and destroy it programmatically (Waiting 5 seconds to destroy the process)...")
val unboundCmd = Seq("ping", "8.8.8.8")
val unboundSource: Source[String, Option[Process]] = createProcessStdOutAsSource(unboundCmd)
// here I look for the Option[Process] from the stream. I need to materialize the process hook
val processOption = unboundSource.toMat(eventSink())(Keep.left).run()
Thread.sleep(5000)
processOption match {
case Some(p) =>
p.destroy()
println("Unbound command was destroyed programmatically!")
case None => sys.error("hmmm!")
}
println("")
println("Creates a 'bound' command and wait till it terminates")
val boundCmd = Seq("ping", "-c", "3", "8.8.8.8")
val boundSource: Source[String, Option[Process]] = createProcessStdOutAsSource(boundCmd)
// here I am just waiting for the source completion
val sourceDoneFuture = boundSource.runWith(eventSink())
sourceDoneFuture.onComplete {
case Success(_) =>
println("Bound command terminated!")
system.terminate()
case Failure(t) =>
sys.error(s"Unexpected Error processing the bounded command stdout: ${t.getMessage}")
system.terminate()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment