Skip to content

Instantly share code, notes, and snippets.

@idot
Created May 3, 2010 10:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save idot/387963 to your computer and use it in GitHub Desktop.
Save idot/387963 to your computer and use it in GitHub Desktop.
class InputActor(latch: CountDownLatch, outputStream : OutputStream) extends Actor {
private val log = LoggerFactory getLogger(Popen getClass)
private val writer = new PrintWriter(outputStream)
def act(){
react{
case null => {
ControlUtil.closeInsulated(writer)
latch.countDown
// log trace("closing writer")
}
case x: String => {
// log trace("acting: " + x + " " + (writer == null))
writer.write(x)
act()
}
case _ => {
ControlUtil.closeInsulated(writer)
latch.countDown
throw new DataFormatException("Only Strings and null are accepted inputs!")
}
}
}
}
class Popen(workDir:String="",inShell:Boolean=true,env:Map[String,String]=Map()) extends PopenT {
private val log = LoggerFactory getLogger(Popen getClass)
private var exitCodeValue: Int = -1
private var inputStream: InputStream = null
private var errorStream: InputStream = null
private var outputStream : OutputStream = null
private var latch: CountDownLatch = null
def exitCode: Int = {
exitCodeValue
}
/**
* use like:
* val builder = new StringBuilder
* new Popen().withInput("sort | sort -r ",
* builder.append(_),
* err => err + listBuffer)
* { inp =>
* inp ! "D\n"
* inp ! "A\n"
* inp ! "E\nC\n\B"
* inp ! null
* }
* will return builder.toString == "EDCBA"
*
* It is important to sen null at the end to inp
* and to delimit the messages with "\n".
*
* @param command the command that will be executed in a subshell
* @param outputFunc the function to process stdout of the subshell
* @param errorFunc the function to process stderr of the subshell
* @param f the acting function (just pass a block that declares an inp)
* inp => { inp ! message1; ...; inp ! messagen; inp ! null }
* @return the exit code of the subshell
*/
def withInput[A](command: String, outputFunc: String => Unit, errorFunc: String => Unit )(f : InputActor => A): Int = {
latch = new CountDownLatch(3)
try{
spawn{
// val args = split(command)
val args = createArgs(command)
// log debug(args.mkString(" "))
val process = Runtime.getRuntime().exec(args)
inputStream = process.getInputStream()
consumeOutput(inputStream, "OUT", outputFunc)
errorStream = process.getErrorStream()
consumeOutput(errorStream, "ERR", errorFunc)
outputStream = process.getOutputStream()
val inputActor = new InputActor(latch, outputStream)
inputActor.start
// log trace("before process waitfor")
f(inputActor)
// log trace("after p")
val returnCode = process.waitFor()
// log trace("after process waitfor")
exitCodeValue = returnCode
}
}
catch {
case e: Throwable =>
// log error("command error: " + command + "\n" + "error: " + e.getMessage().getBytes("UTF8"))
close()
exitCodeValue = -1
}
finally{
//cant close streams in finally because this method returns immediately!
}
latch.await()
// log trace("end latch await")
close()
return exitCodeValue
}
/**
* executes command and blocks until shell returns
*
* @param command
* @param outputFunc
* @param errorFunc
* @return
*/
def executeBlocked(command: String, outputFunc: String => Unit, errorFunc: String => Unit ): Int = {
latch = new CountDownLatch(2)
try{
// val args = split(command)
val args = createArgs(command)
log debug(args.mkString(" "))
val arr = Array[String]()
val process = if(workDir != ""){Runtime.getRuntime().exec(args, toEnv, new File(workDir))}else{Runtime.getRuntime().exec(args)}
inputStream = process.getInputStream()
consumeOutput(inputStream, "OUT", outputFunc)
errorStream = process.getErrorStream()
consumeOutput(errorStream, "ERR", errorFunc)
log trace("before process waitfor")
val returnCode = process.waitFor()
log trace("after process waitfor")
exitCodeValue = returnCode
}
catch {
case e: Throwable =>
log error("command error: " + command + "\n" + "error: " + e.getMessage().getBytes("UTF8"))
close()
exitCodeValue = -1
}
finally{
//cant close streams in finally because this method returns immediately!
}
latch.await()
log trace("end latch await")
close()
return exitCodeValue
}
/**
* executes command and returns immediately
*
* @param command
* @param outputFunc
* @param errorFunc
* @return
*/
def executeSpawn(command: String, outputFunc: String => Unit, errorFunc: String => Unit ): Int = {
try{
spawn{
latch = new CountDownLatch(2)
// val args = split(command)
val args = createArgs(command)
log debug(args.mkString(" "))
val process = Runtime.getRuntime().exec(args)
inputStream = process.getInputStream()
consumeOutput(inputStream, "OUT", outputFunc)
errorStream = process.getErrorStream()
consumeOutput(errorStream, "ERR", errorFunc)
log trace("before process waitfor")
val returnCode = process.waitFor()
log trace("after process waitfor")
exitCodeValue = returnCode
latch.await()
log trace("end latch await")
close()
}
}
catch {
case e: Throwable =>
log error("command error: " + command + "\n" + "error: " + e.getMessage().getBytes("UTF8"))
close()
exitCodeValue = -1
}
finally{
//cant close streams in finally because this method returns immediately!
}
return exitCodeValue
}
/**
* Is it good to let people close it?
* This should then maybe called break?
* I dont't even know if this works
*/
def close(){
ControlUtil.closeInsulated(outputStream)
ControlUtil.closeInsulated(inputStream)
ControlUtil.closeInsulated(errorStream)
}
def consumeOutput(stream: InputStream, readerType: String, output: String => Unit): Unit = {
spawn{
var count = 0
val reader = new BufferedReader(new InputStreamReader(stream))
var line: String = null
while({line = reader.readLine; line != null}){
count += 1
output(line)
}
// log trace("closing " + readerType + " output lines: " + count )
ControlUtil.closeInsulated(reader)
latch.countDown
}
}
def createArgs(command:String): Array[String] = {
if(inShell){
Array("/bin/sh", "-c", command)
}
else{
command.split("\\s+").map(_.trim())
}
}
def toEnv(): Array[String] = {
(for((k,v) <- env)yield{ k+"="+v} ).toArray
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment