Simple application that shows how to push content into a source externally and also how to work with the materialized views in order to control the source.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicBoolean | |
import akka.Done | |
import akka.actor.ActorSystem | |
import akka.event.{Logging, LoggingAdapter} | |
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} | |
import akka.stream.{ActorMaterializer, OverflowStrategy} | |
import scala.concurrent.Future | |
import scala.sys.process._ | |
import scala.util.{Failure, Success} | |
import scala.concurrent.blocking | |
object EventSource extends App { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
implicit val executionContext = system.dispatcher | |
implicit val log: LoggingAdapter = Logging(system, getClass) | |
val DefaultMaxQueueSize = 100 | |
// creates a Source where you also get the Process back so the process can be managed later on programmatically | |
def createProcessStdOutAsSource(cmd: Seq[String]): Source[String, Option[Process]] = { | |
// this could have been done by using ProcessBuilder.lineStream method but the purpose of this gist is to | |
// show how content can be added externally to the Source. | |
val (queue, pub) = Source.queue[String](DefaultMaxQueueSize, OverflowStrategy.backpressure).toMat(Sink.asPublisher(false))(Keep.both).run() | |
def processEvent(eventString: String): Unit = { | |
queue.offer(eventString) | |
} | |
val processLogger = ProcessLogger(processEvent, errorMessage => log.error(s"Error listening from subprocess: $errorMessage")) | |
val process = cmd run processLogger | |
val processDone = new AtomicBoolean(false) | |
Future { | |
blocking { | |
process.exitValue() | |
} | |
}.onComplete(_ => processDone.compareAndSet(false, true)) | |
Source.fromPublisher(pub).mapMaterializedValue(_ => Option(process)).takeWhile((String) => !processDone.get()) | |
} | |
def eventSink(): Sink[String, Future[Done]] = { | |
Flow[String].map { event => println(s"[STDOUT]: $event")}.toMat(Sink.ignore)(Keep.right) | |
} | |
println("") | |
println("Creates an 'unbound' command and destroy it programmatically (Waiting 5 seconds to destroy the process)...") | |
val unboundCmd = Seq("ping", "8.8.8.8") | |
val unboundSource: Source[String, Option[Process]] = createProcessStdOutAsSource(unboundCmd) | |
// here I look for the Option[Process] from the stream. I need to materialize the process hook | |
val processOption = unboundSource.toMat(eventSink())(Keep.left).run() | |
Thread.sleep(5000) | |
processOption match { | |
case Some(p) => | |
p.destroy() | |
println("Unbound command was destroyed programmatically!") | |
case None => sys.error("hmmm!") | |
} | |
println("") | |
println("Creates a 'bound' command and wait till it terminates") | |
val boundCmd = Seq("ping", "-c", "3", "8.8.8.8") | |
val boundSource: Source[String, Option[Process]] = createProcessStdOutAsSource(boundCmd) | |
// here I am just waiting for the source completion | |
val sourceDoneFuture = boundSource.runWith(eventSink()) | |
sourceDoneFuture.onComplete { | |
case Success(_) => | |
println("Bound command terminated!") | |
system.terminate() | |
case Failure(t) => | |
sys.error(s"Unexpected Error processing the bounded command stdout: ${t.getMessage}") | |
system.terminate() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment