Skip to content

Instantly share code, notes, and snippets.

@spieden
Last active May 12, 2018 02:55
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spieden/482a6c3a8bd3b0f82c89 to your computer and use it in GitHub Desktop.
Save spieden/482a6c3a8bd3b0f82c89 to your computer and use it in GitHub Desktop.
(ns shell
(:require [me.raynes.conch.low-level :as sh]
[slingshot.slingshot :refer [throw+ try+]]
[clojure.core.async :refer [go alts!!]]
[clojure.string :as str])
(:import (java.io IOException StringWriter)
(java.util.concurrent ExecutionException)))
(defn start-step! [upstream step]
(let [prev-step (-> upstream last)
proc (apply sh/proc step)
err-stream (future (sh/stream-to-string proc :err))
in-stream (when prev-step
(future (sh/feed-from proc (:out prev-step))
(sh/done proc)))]
(conj upstream (merge proc {:err-stream err-stream
:in-stream in-stream
:cl (str/join " " step)}))))
(defn start-pipeline! [steps in out]
(let [pipeline (reduce start-step! [] steps)
[top bottom] ((juxt first last) pipeline)
out-stream (future (sh/stream-to bottom :out out))
in-stream (future (sh/feed-from top in)
(sh/done top))]
{:step-procs (into [(assoc top :in-stream in-stream)]
(rest pipeline))
:out-stream out-stream}))
(defn exec-io-exception? [e]
(and (instance? ExecutionException e)
(instance? IOException (.getCause e))))
(defn wait-proc [proc]
(let [in-result (try+ (deref (:in-stream proc))
(catch exec-io-exception? e e))
exit-code (sh/exit-code proc)]
(if (or (exec-io-exception? in-result)
(not (= 0 exit-code)))
{:cl (:cl proc)
:err @(:err-stream proc)
:exit-code exit-code}
:ok)))
(defn wait-pipeline! [{:keys [step-procs out-stream]}]
(let [proc-chans (mapv #(go (wait-proc %)) step-procs)]
(loop [pending (set proc-chans)
[result chan] (alts!! proc-chans)]
(cond (= :ok result) (let [new-pending (disj pending chan)]
(if-not (empty? new-pending)
(recur new-pending (alts!! proc-chans))))
(map? result) (throw+ result)
(instance? Throwable result) (throw result))))
(deref out-stream)
:ok)
(defn run-pipeline!
([steps in]
(let [out (StringWriter.)]
(run-pipeline! steps in out)
(str out)))
([steps in out]
(wait-pipeline! (start-pipeline! steps in out))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment