Skip to content

Instantly share code, notes, and snippets.

@leonoel
Last active February 21, 2024 16:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leonoel/d13dd1b8045b3167dbf688f57244d221 to your computer and use it in GitHub Desktop.
Save leonoel/d13dd1b8045b3167dbf688f57244d221 to your computer and use it in GitHub Desktop.
Example of clojure prepl usage with missionary
(ns prepl
(:require [clojure.core.server :as s]
[missionary.core :as m])
(:import (java.io PipedReader PipedWriter)
(java.util.concurrent Executors)))
(defn remote "
Returns a function taking a `java.io.PipedWriter` and returning a flow connecting to a remote prepl on given `host` and
`port`, sending the content of the pipe to the remote peer and emitting received evaluation results. The pipe is closed
on flow cancellation.
" [host port]
(fn [^PipedWriter pipe]
(m/observe
(fn [cb]
(.start
(Thread.
(reify Runnable
(run [_]
(s/remote-prepl host port
(PipedReader. pipe) cb)))))
#(.close pipe)))))
(defn write-form "
Returns a task writing the edn representation of given `form` to given `java.io.Writer`.
" [executor writer form]
(m/via executor
(binding [*out* writer]
(prn form))))
(defn send-forms "
Returns a flow serializing and writing forms produced by flow `forms` to a `java.io.PipedWriter`, and concurrently
running the flow returned by the `channel` function, called with the pipe.
" [channel forms]
(m/ap
(let [writer (PipedWriter.)]
(m/amb= (m/?> (channel writer))
(do (m/? (write-form (Executors/newSingleThreadExecutor)
writer (m/?> forms))) (m/amb))))))
(defn process-tuples "
Returns a flow processing each element of tuples produced by given flow `input` with a separate flow processor.
The flow processors `ps` must be a sequence matching the shape of expected tuples. Resulting tuples are collapsed with
given function `f`.
" [f ps input]
(let [>input (m/stream input)]
(->> ps
(map-indexed (fn [i p] (p (m/sample #(nth % i) >input))))
(apply m/zip f))))
(comment
;; start a server.
(def server
(s/start-server
{:accept 'clojure.core.server/io-prepl
:port 0
:name "jvm"}))
;; connect to the server and define a function `eval!` to ask for remote evaluation.
;; `eval!` returns a task completing with the result of evaluation.
(def cancel
((->> (m/observe
(fn [!]
(defn eval! [x]
(let [d (m/dfv)]
(! [d x]) d))
#(def eval! nil)))
(process-tuples (fn [d x] (d x))
[identity (partial send-forms (remote "localhost" (.getLocalPort server)))])
(m/reduce {} nil))
(fn [_] (println "Remote prepl terminated."))
(fn [^Throwable e] (.printStackTrace e))))
(m/? (eval! '(* 6 7)))
(cancel)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment