Skip to content

Instantly share code, notes, and snippets.

@markehammons
Created February 25, 2021 00:14
Show Gist options
  • Save markehammons/e5da16bd160a7fbabdac79b87e067238 to your computer and use it in GitHub Desktop.
Save markehammons/e5da16bd160a7fbabdac79b87e067238 to your computer and use it in GitHub Desktop.
import zio._
import zio.stream._
import scala.scalanative.unsafe._
import scala.scalanative.posix.unistd.{
fork,
vfork,
pipe,
close,
dup2,
STDIN_FILENO,
STDERR_FILENO,
STDOUT_FILENO,
execve,
_exit,
write,
read
}
import scala.scalanative.posix.sys.types.pid_t
import scala.scalanative.unsigned._
import ops.lib.csys.Wait
import java.io.FileInputStream
object Process {
val default = Runtime.default
trait ZProcess {
val terminate: Task[Unit]
val kill: Task[Unit]
val exitValue: Task[Int]
val outStream: Stream[Throwable, Byte]
val isAlive: Task[Boolean]
}
private def runSubprocess(
bin: LPath,
args: Seq[String],
inPipe: CInt,
outPipe: CInt
): Nothing = {
val totalArgs = bin.toString() +: args
Zone { implicit z =>
val cArgs = alloc[CString](totalArgs.length + 1)
for { i <- 0 until totalArgs.length } {
cArgs(i) = toCString(totalArgs(i))
}
cArgs(totalArgs.length + 1) = null
println("done loading")
dup2(outPipe, STDIN_FILENO)
dup2(inPipe, STDOUT_FILENO)
dup2(inPipe, STDERR_FILENO)
close(inPipe)
close(outPipe)
val res = execve(cArgs(0), cArgs, null)
println(s"closed with $res")
_exit(res)
}
???
}
private def feedSubprocess(
inputStream: Stream[Throwable, Byte],
outPipe: CInt
): Nothing = {
println("writing to baby")
default.unsafeRunAsync {
inputStream
.grouped(1024)
.foreachWhile { c =>
Task(
Zone { implicit z =>
val heap = alloc[Byte](c.size)
for (i <- 0 until c.size) {
heap(i) = c(i)
}
println(s"writing ${c.size} bytes")
val wrote = write(outPipe, heap, c.size.toULong)
println(s"wrote $wrote bytes")
wrote == c.size
}
)
}
.ensuring(URIO {
println("closing pipes")
_exit(0)
})
} { case _ =>
()
}
???
}
private def readFromSubprocess(
inPipe: CInt,
fork2: pid_t
): Stream[Throwable, Byte] = {
Stream
.unfoldChunk(1024) { lastSize =>
println("trying to read")
val buf = stackalloc[Byte](1024)
println(inPipe)
println(lastSize)
if (lastSize < 1) None
else {
val rd = read(inPipe, buf, 1024.toULong)
val cb = ChunkBuilder.make[Byte](rd)
println(s"read $rd bytes")
var i = 0
while (i < rd) {
cb += buf(i)
i += 1
}
Some(cb.result() -> rd)
}
}
.ensuring(UIO {
()
})
.concat {
Stream
.fromEffect(Task {
val result = stackalloc[CInt](1)
Wait.waitpid(fork2, result, 0)
!result
})
.flatMap(result =>
if (result == 0) Stream.empty
else Stream.fail(new Exception(s"process failed: ${result}"))
)
}
}
trait Service {
def getBinaryPath(path: Iterable[LPath], bin: LPath): Option[LPath]
def runProcess(
bin: LPath,
args: String*
)(
inputStream: Stream[Throwable, Byte] = Stream.empty
): Task[Stream[Throwable, Byte]]
}
case class ProcessImpl(env: zio.system.System.Service) extends Service {
def getBinaryPath(path: Iterable[LPath], bin: LPath): Option[LPath] = {
if (bin.isAbsolute || bin.startsWith(".")) {
Some(bin)
} else {
path.map(_ / bin).find(_.isExecutable)
}
}
def runProcess(
bin: LPath,
args: String*
)(
inputStream: Stream[Throwable, Byte] = Stream.empty
): Task[Stream[Throwable, Byte]] =
for {
path <- env
.env("PATH")
.someOrFail(new Exception("PATH is undefined"))
bin <- Task(getBinaryPath(path.split(":").map(LPath(_)), bin))
.someOrFail(new Exception(s"Could not find executable $bin"))
} yield {
val inPipes = stackalloc[CInt](2)
val outPipes = stackalloc[CInt](2)
pipe(inPipes)
pipe(outPipes)
val fork1 = fork()
if (fork1 != 0) {
val fork2 = vfork()
if (fork2 == 0) {
close(inPipes(0))
close(outPipes(1))
runSubprocess(bin, args, inPipes(1), outPipes(0))
} else {
close(outPipes(0))
close(outPipes(1))
close(inPipes(1))
readFromSubprocess(inPipes(0), fork2)
}
} else {
close(inPipes(0))
close(outPipes(1))
close(outPipes(0))
feedSubprocess(inputStream, inPipes(1))
}
}
}
val live: ZLayer[Has[zio.system.System.Service], Throwable, Has[
Process.Service
]] =
ZLayer.fromService(ProcessImpl)
def runProcess(bin: LPath, args: String*)(
inputStream: Stream[Throwable, Byte] = Stream.empty
) = {
for {
ps <- RIO.environment[Has[Process.Service]]
res <- ps.get[Process.Service].runProcess(bin, args: _*)(inputStream)
} yield res
}
}
//usage
val t = for {
catStream <- Process
.runProcess(l"cat")(
Stream.fromChunk(Chunk.fromArray("hello\n".getBytes()))
)
outStream <- Process
.runProcess(l"cat")(catStream)
_ <- outStream.foreachChunk(c => putStr(new String(c.toArray)))
} yield {}
runtime.unsafeRunAsync(t.provideSomeLayer[ZEnv](Process.live)) {
case zio.Exit.Failure(cause) => println(cause.prettyPrint)
case zio.Exit.Success(_) => println("complete")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment