Created
May 3, 2010 10:55
-
-
Save idot/387963 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class InputActor(latch: CountDownLatch, outputStream : OutputStream) extends Actor { | |
private val log = LoggerFactory getLogger(Popen getClass) | |
private val writer = new PrintWriter(outputStream) | |
def act(){ | |
react{ | |
case null => { | |
ControlUtil.closeInsulated(writer) | |
latch.countDown | |
// log trace("closing writer") | |
} | |
case x: String => { | |
// log trace("acting: " + x + " " + (writer == null)) | |
writer.write(x) | |
act() | |
} | |
case _ => { | |
ControlUtil.closeInsulated(writer) | |
latch.countDown | |
throw new DataFormatException("Only Strings and null are accepted inputs!") | |
} | |
} | |
} | |
} | |
class Popen(workDir:String="",inShell:Boolean=true,env:Map[String,String]=Map()) extends PopenT { | |
private val log = LoggerFactory getLogger(Popen getClass) | |
private var exitCodeValue: Int = -1 | |
private var inputStream: InputStream = null | |
private var errorStream: InputStream = null | |
private var outputStream : OutputStream = null | |
private var latch: CountDownLatch = null | |
def exitCode: Int = { | |
exitCodeValue | |
} | |
/** | |
* use like: | |
* val builder = new StringBuilder | |
* new Popen().withInput("sort | sort -r ", | |
* builder.append(_), | |
* err => err + listBuffer) | |
* { inp => | |
* inp ! "D\n" | |
* inp ! "A\n" | |
* inp ! "E\nC\n\B" | |
* inp ! null | |
* } | |
* will return builder.toString == "EDCBA" | |
* | |
* It is important to sen null at the end to inp | |
* and to delimit the messages with "\n". | |
* | |
* @param command the command that will be executed in a subshell | |
* @param outputFunc the function to process stdout of the subshell | |
* @param errorFunc the function to process stderr of the subshell | |
* @param f the acting function (just pass a block that declares an inp) | |
* inp => { inp ! message1; ...; inp ! messagen; inp ! null } | |
* @return the exit code of the subshell | |
*/ | |
def withInput[A](command: String, outputFunc: String => Unit, errorFunc: String => Unit )(f : InputActor => A): Int = { | |
latch = new CountDownLatch(3) | |
try{ | |
spawn{ | |
// val args = split(command) | |
val args = createArgs(command) | |
// log debug(args.mkString(" ")) | |
val process = Runtime.getRuntime().exec(args) | |
inputStream = process.getInputStream() | |
consumeOutput(inputStream, "OUT", outputFunc) | |
errorStream = process.getErrorStream() | |
consumeOutput(errorStream, "ERR", errorFunc) | |
outputStream = process.getOutputStream() | |
val inputActor = new InputActor(latch, outputStream) | |
inputActor.start | |
// log trace("before process waitfor") | |
f(inputActor) | |
// log trace("after p") | |
val returnCode = process.waitFor() | |
// log trace("after process waitfor") | |
exitCodeValue = returnCode | |
} | |
} | |
catch { | |
case e: Throwable => | |
// log error("command error: " + command + "\n" + "error: " + e.getMessage().getBytes("UTF8")) | |
close() | |
exitCodeValue = -1 | |
} | |
finally{ | |
//cant close streams in finally because this method returns immediately! | |
} | |
latch.await() | |
// log trace("end latch await") | |
close() | |
return exitCodeValue | |
} | |
/** | |
* executes command and blocks until shell returns | |
* | |
* @param command | |
* @param outputFunc | |
* @param errorFunc | |
* @return | |
*/ | |
def executeBlocked(command: String, outputFunc: String => Unit, errorFunc: String => Unit ): Int = { | |
latch = new CountDownLatch(2) | |
try{ | |
// val args = split(command) | |
val args = createArgs(command) | |
log debug(args.mkString(" ")) | |
val arr = Array[String]() | |
val process = if(workDir != ""){Runtime.getRuntime().exec(args, toEnv, new File(workDir))}else{Runtime.getRuntime().exec(args)} | |
inputStream = process.getInputStream() | |
consumeOutput(inputStream, "OUT", outputFunc) | |
errorStream = process.getErrorStream() | |
consumeOutput(errorStream, "ERR", errorFunc) | |
log trace("before process waitfor") | |
val returnCode = process.waitFor() | |
log trace("after process waitfor") | |
exitCodeValue = returnCode | |
} | |
catch { | |
case e: Throwable => | |
log error("command error: " + command + "\n" + "error: " + e.getMessage().getBytes("UTF8")) | |
close() | |
exitCodeValue = -1 | |
} | |
finally{ | |
//cant close streams in finally because this method returns immediately! | |
} | |
latch.await() | |
log trace("end latch await") | |
close() | |
return exitCodeValue | |
} | |
/** | |
* executes command and returns immediately | |
* | |
* @param command | |
* @param outputFunc | |
* @param errorFunc | |
* @return | |
*/ | |
def executeSpawn(command: String, outputFunc: String => Unit, errorFunc: String => Unit ): Int = { | |
try{ | |
spawn{ | |
latch = new CountDownLatch(2) | |
// val args = split(command) | |
val args = createArgs(command) | |
log debug(args.mkString(" ")) | |
val process = Runtime.getRuntime().exec(args) | |
inputStream = process.getInputStream() | |
consumeOutput(inputStream, "OUT", outputFunc) | |
errorStream = process.getErrorStream() | |
consumeOutput(errorStream, "ERR", errorFunc) | |
log trace("before process waitfor") | |
val returnCode = process.waitFor() | |
log trace("after process waitfor") | |
exitCodeValue = returnCode | |
latch.await() | |
log trace("end latch await") | |
close() | |
} | |
} | |
catch { | |
case e: Throwable => | |
log error("command error: " + command + "\n" + "error: " + e.getMessage().getBytes("UTF8")) | |
close() | |
exitCodeValue = -1 | |
} | |
finally{ | |
//cant close streams in finally because this method returns immediately! | |
} | |
return exitCodeValue | |
} | |
/** | |
* Is it good to let people close it? | |
* This should then maybe called break? | |
* I dont't even know if this works | |
*/ | |
def close(){ | |
ControlUtil.closeInsulated(outputStream) | |
ControlUtil.closeInsulated(inputStream) | |
ControlUtil.closeInsulated(errorStream) | |
} | |
def consumeOutput(stream: InputStream, readerType: String, output: String => Unit): Unit = { | |
spawn{ | |
var count = 0 | |
val reader = new BufferedReader(new InputStreamReader(stream)) | |
var line: String = null | |
while({line = reader.readLine; line != null}){ | |
count += 1 | |
output(line) | |
} | |
// log trace("closing " + readerType + " output lines: " + count ) | |
ControlUtil.closeInsulated(reader) | |
latch.countDown | |
} | |
} | |
def createArgs(command:String): Array[String] = { | |
if(inShell){ | |
Array("/bin/sh", "-c", command) | |
} | |
else{ | |
command.split("\\s+").map(_.trim()) | |
} | |
} | |
def toEnv(): Array[String] = { | |
(for((k,v) <- env)yield{ k+"="+v} ).toArray | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment