public
Last active

A buffering version of cat(1) in ClojureScript with core.async

  • Download Gist
cat.cljs
Clojure
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
(ns cljs-cat.core
(:require [cljs.core.async :refer [chan sliding-buffer put! close! <! >!]]
[clojure.string :as string]
[cljs.nodejs :refer [require]])
(:require-macros [cljs.core.async.macros :as m :refer [go alts!]])
)
 
(defn log [& args]
(let [now (js/Date.)
time-str (format "%04d-%02d-%02dT%02d:%02d:%02d.%03dZ"
(.getUTCFullYear now)
(.getUTCMonth now)
(.getUTCDate now)
(.getUTCHours now)
(.getUTCMinutes now)
(.getUTCSeconds now)
(.getUTCMilliseconds now))
logstr (apply str (interpose " " (cons time-str (map pr-str args))))]
(.write js/process.stderr (str logstr "\n"))))
 
(def outstanding (atom {}))
(defn log-outstanding []
(let [{:keys [read wrote]} @outstanding
delta (- read wrote)]
(log :read read :wrote wrote :delta delta
(->> (iterate #(bit-shift-right % 10) delta)
(take-while (partial not= 0))
(map #(mod % 1024))
(map #(str %2 %1 " ") ["b" "k" "m" "g" "t"])
reverse
(apply str)
))))
 
(add-watch outstanding :throughput-watch log-outstanding)
 
(defn read-to-chan [ch strm]
(let [readable-ch (chan)]
(doto strm
(.on "readable" #(put! readable-ch :readable))
(.on "end" #(close! readable-ch)))
(go
(while (<! readable-ch)
(loop []
(let [buf (.read strm )]
(when buf
(swap! outstanding #(update-in % [:read] (fnil + 0) (.-length buf)))
(>! ch buf)
(recur))))))
ch))
 
(defn writeable-from-chan [ch strm]
(let [drains (chan)]
(go
(loop []
(let [buf (<! ch)
drain-cb (fn drain-cb [] (put! drains :token))]
(if buf
(do
(.write strm buf drain-cb)
(<! drains)
(swap! outstanding #(update-in % [:wrote] (fnil + 0) (.-length buf)))
(recur))
(.end ch)))))))
 
(def fs (require "fs"))
 
(defn -main
"I don't do a whole lot."
[x]
(let [pipe (chan 1024)
;; Ideally, we'd just use process.stdout here, but writes to it
;; seem to block whether or not stdout is a terminal.
ostr (.createWriteStream fs "/dev/stdout")]
(read-to-chan pipe js/process.stdin)
(writeable-from-chan pipe ostr)
(js/setInterval log-outstanding 100)
(println x "Hello, World!")))
 
(set! *main-cli-fn* -main)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.