Skip to content

Instantly share code, notes, and snippets.

@kubukoz
Last active April 9, 2021 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kubukoz/a061c6a71abb70ddf23c8333d92e6b77 to your computer and use it in GitHub Desktop.
Save kubukoz/a061c6a71abb70ddf23c8333d92e6b77 to your computer and use it in GitHub Desktop.
Run system process with fs2
trait ProcessRunner[F[_]] {
def run(program: List[String])(errorOut: Pipe[F, Byte, Nothing]): Pipe[F, Byte, Byte]
}
object ProcessRunner {
def apply[F[_]](implicit F: ProcessRunner[F]): ProcessRunner[F] = F
// This is a relatively simple implementation, for the real deal go use something like vigoo/prox
implicit def instance[F[_]: Async]: ProcessRunner[F] = new ProcessRunner[F] {
import scala.jdk.CollectionConverters._
val readBufferSize = 4096
def run(program: List[String])(errorOut: Pipe[F, Byte, Nothing]): Pipe[F, Byte, Byte] = inputs => {
fs2
.Stream
.bracket(Sync[F].blocking(new java.lang.ProcessBuilder(program.asJava).start()))(p => Sync[F].blocking(p.destroy()))
.flatMap { process =>
fs2
.io
.readInputStream[F](Sync[F].delay(process.getInputStream()), chunkSize = readBufferSize)
.concurrently(
inputs.through(fs2.io.writeOutputStream[F](Sync[F].delay(process.getOutputStream())))
)
.concurrently(
fs2
.io
.readInputStream[F](Sync[F].delay(process.getErrorStream()), chunkSize = readBufferSize)
.through(errorOut)
)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment