-
-
Save plexus/e8ac2ed568e50d085a3e5058ce26b13b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns com.nextjournal.tools.async | |
"Wrappers for clojure.core.async, with extra validation. This checks | |
for (in)correct use of go-blocks. | |
(go ...) should have one of >!, <!, alt!, alts!! in its lexical scope. | |
(go ...) should not have >!!, <!! in its dynamic scope. | |
" | |
(:refer-clojure :exclude [into map merge partition partition-by reduce take transduce]) | |
(:require [clojure.core.async :as async] | |
[clojure.walk :as walk] | |
[potemkin])) | |
;; https://clojureverse.org/t/core-async-locks-up-when-using-timeout-deep-inside-async-processes/1780/9 | |
(def ^:dynamic *in-go-scope* | |
"Is the current thread's calling context inside a go-block." | |
false) | |
(defn contains-special-go-loop-symbol? [body] | |
(->> body | |
(tree-seq #(or (seq? %) (seqable? %)) seq) | |
(filter symbol?) | |
(clojure.core/map (comp symbol name)) | |
(some '#{<! >! alt! alts!}))) | |
(defmacro go [& body] | |
(assert (contains-special-go-loop-symbol? body) | |
"(go ...)-block without >!, <!, alt!, alts!. Use async/thread instead.") | |
`(binding [*in-go-scope* true] | |
(async/go ~@body))) | |
(defmacro thread [& body] | |
`(binding [*in-go-scope* false] | |
(async/thread ~@body))) | |
(defmacro go-loop [& body] | |
(assert (contains-special-go-loop-symbol? body) | |
"(go-loop ...)-block without >!, <!, alt!, alts!. Use async/thread instead.") | |
`(binding [*in-go-scope* true] | |
(async/go-loop ~@body))) | |
(defn <!! [port] | |
(assert (not *in-go-scope*) | |
"Blocking take (<!!) used inside a go-block's calling context is not allowed.") | |
{:pre [(not *in-go-scope*)]} | |
(async/<!! port)) | |
(defn >!! [port val] | |
(assert (not *in-go-scope*) | |
"Blocking take (<!!) used inside a go-block's calling context is not allowed.") | |
(async/>!! port val)) | |
(defmacro <! [port] | |
`(async/<! ~port)) | |
(defmacro >! [port val] | |
`(async/>! ~port ~val)) | |
;; Provide vars with doc/arglists metadata | |
(doseq [[sym var] (ns-publics *ns*)] | |
(if-let [src (resolve (symbol "clojure.core.async" (str sym)))] | |
(alter-meta! var clojure.core/merge (select-keys (meta src) [:arglists :doc])))) | |
;; Import the rest of core.async | |
(potemkin/import-vars [clojure.core.async | |
<! | |
>! | |
Mix | |
Mult | |
Mux | |
Pub | |
admix | |
admix* | |
alt! | |
alt!! | |
alts! | |
alts!! | |
buffer | |
chan | |
close! | |
do-alt | |
do-alts | |
dropping-buffer | |
fn-handler | |
into | |
ioc-alts! | |
map | |
merge | |
mix | |
mult | |
muxch* | |
offer! | |
onto-chan | |
pipe | |
pipeline | |
pipeline-async | |
pipeline-blocking | |
poll! | |
promise-chan | |
pub | |
put! | |
reduce | |
sliding-buffer | |
solo-mode | |
solo-mode* | |
split | |
sub | |
sub* | |
take | |
take! | |
tap | |
tap* | |
thread-call | |
timeout | |
to-chan | |
toggle | |
toggle* | |
transduce | |
unblocking-buffer? | |
unmix | |
unmix* | |
unmix-all | |
unmix-all* | |
unsub | |
unsub* | |
unsub-all | |
unsub-all* | |
untap | |
untap* | |
untap-all | |
untap-all*]) | |
;; Used the generate the import list | |
#_ | |
(->> (symbol "clojure.core.async") | |
find-ns | |
ns-publics | |
(remove #(:deprecated (meta (val %)))) | |
(clojure.core/map key) | |
(remove (set (keys (ns-publics *ns*)))) | |
sort) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment