Last active
December 24, 2015 05:29
-
-
Save cstorey/6751325 to your computer and use it in GitHub Desktop.
A buffering version of cat(1) in ClojureScript with core.async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns cljs-cat.core | |
(:require [cljs.core.async :refer [chan sliding-buffer put! close! <! >!]] | |
[clojure.string :as string] | |
[cljs.nodejs :refer [require]]) | |
(:require-macros [cljs.core.async.macros :as m :refer [go alts!]]) | |
) | |
(defn log [& args] | |
(let [now (js/Date.) | |
time-str (format "%04d-%02d-%02dT%02d:%02d:%02d.%03dZ" | |
(.getUTCFullYear now) | |
(.getUTCMonth now) | |
(.getUTCDate now) | |
(.getUTCHours now) | |
(.getUTCMinutes now) | |
(.getUTCSeconds now) | |
(.getUTCMilliseconds now)) | |
logstr (apply str (interpose " " (cons time-str (map pr-str args))))] | |
(.write js/process.stderr (str logstr "\n")))) | |
(def outstanding (atom {})) | |
(defn log-outstanding [] | |
(let [{:keys [read wrote]} @outstanding | |
delta (- read wrote)] | |
(log :read read :wrote wrote :delta delta | |
(->> (iterate #(bit-shift-right % 10) delta) | |
(take-while (partial not= 0)) | |
(map #(mod % 1024)) | |
(map #(str %2 %1 " ") ["b" "k" "m" "g" "t"]) | |
reverse | |
(apply str) | |
)))) | |
(add-watch outstanding :throughput-watch log-outstanding) | |
(defn read-to-chan [ch strm] | |
(let [readable-ch (chan)] | |
(doto strm | |
(.on "readable" #(put! readable-ch :readable)) | |
(.on "end" #(close! readable-ch))) | |
(go | |
(while (<! readable-ch) | |
(loop [] | |
(let [buf (.read strm )] | |
(when buf | |
(swap! outstanding #(update-in % [:read] (fnil + 0) (.-length buf))) | |
(>! ch buf) | |
(recur)))))) | |
ch)) | |
(defn writeable-from-chan [ch strm] | |
(let [drains (chan)] | |
(go | |
(loop [] | |
(let [buf (<! ch) | |
drain-cb (fn drain-cb [] (put! drains :token))] | |
(if buf | |
(do | |
(.write strm buf drain-cb) | |
(<! drains) | |
(swap! outstanding #(update-in % [:wrote] (fnil + 0) (.-length buf))) | |
(recur)) | |
(.end ch))))))) | |
(def fs (require "fs")) | |
(defn -main | |
"I don't do a whole lot." | |
[x] | |
(let [pipe (chan 1024) | |
;; Ideally, we'd just use process.stdout here, but writes to it | |
;; seem to block whether or not stdout is a terminal. | |
ostr (.createWriteStream fs "/dev/stdout")] | |
(read-to-chan pipe js/process.stdin) | |
(writeable-from-chan pipe ostr) | |
(js/setInterval log-outstanding 100) | |
(println x "Hello, World!"))) | |
(set! *main-cli-fn* -main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment