Skip to content

Instantly share code, notes, and snippets.

@plexus

plexus/async.clj Secret

Created May 4, 2018 10:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save plexus/e8ac2ed568e50d085a3e5058ce26b13b to your computer and use it in GitHub Desktop.
Save plexus/e8ac2ed568e50d085a3e5058ce26b13b to your computer and use it in GitHub Desktop.
(ns com.nextjournal.tools.async
"Wrappers for clojure.core.async, with extra validation. This checks
for (in)correct use of go-blocks.
(go ...) should have one of >!, <!, alt!, alts!! in its lexical scope.
(go ...) should not have >!!, <!! in its dynamic scope.
"
(:refer-clojure :exclude [into map merge partition partition-by reduce take transduce])
(:require [clojure.core.async :as async]
[clojure.walk :as walk]
[potemkin]))
;; https://clojureverse.org/t/core-async-locks-up-when-using-timeout-deep-inside-async-processes/1780/9
(def ^:dynamic *in-go-scope*
"Is the current thread's calling context inside a go-block."
false)
(defn contains-special-go-loop-symbol? [body]
(->> body
(tree-seq #(or (seq? %) (seqable? %)) seq)
(filter symbol?)
(clojure.core/map (comp symbol name))
(some '#{<! >! alt! alts!})))
(defmacro go [& body]
(assert (contains-special-go-loop-symbol? body)
"(go ...)-block without >!, <!, alt!, alts!. Use async/thread instead.")
`(binding [*in-go-scope* true]
(async/go ~@body)))
(defmacro thread [& body]
`(binding [*in-go-scope* false]
(async/thread ~@body)))
(defmacro go-loop [& body]
(assert (contains-special-go-loop-symbol? body)
"(go-loop ...)-block without >!, <!, alt!, alts!. Use async/thread instead.")
`(binding [*in-go-scope* true]
(async/go-loop ~@body)))
(defn <!! [port]
(assert (not *in-go-scope*)
"Blocking take (<!!) used inside a go-block's calling context is not allowed.")
{:pre [(not *in-go-scope*)]}
(async/<!! port))
(defn >!! [port val]
(assert (not *in-go-scope*)
"Blocking take (<!!) used inside a go-block's calling context is not allowed.")
(async/>!! port val))
(defmacro <! [port]
`(async/<! ~port))
(defmacro >! [port val]
`(async/>! ~port ~val))
;; Provide vars with doc/arglists metadata
(doseq [[sym var] (ns-publics *ns*)]
(if-let [src (resolve (symbol "clojure.core.async" (str sym)))]
(alter-meta! var clojure.core/merge (select-keys (meta src) [:arglists :doc]))))
;; Import the rest of core.async
(potemkin/import-vars [clojure.core.async
<!
>!
Mix
Mult
Mux
Pub
admix
admix*
alt!
alt!!
alts!
alts!!
buffer
chan
close!
do-alt
do-alts
dropping-buffer
fn-handler
into
ioc-alts!
map
merge
mix
mult
muxch*
offer!
onto-chan
pipe
pipeline
pipeline-async
pipeline-blocking
poll!
promise-chan
pub
put!
reduce
sliding-buffer
solo-mode
solo-mode*
split
sub
sub*
take
take!
tap
tap*
thread-call
timeout
to-chan
toggle
toggle*
transduce
unblocking-buffer?
unmix
unmix*
unmix-all
unmix-all*
unsub
unsub*
unsub-all
unsub-all*
untap
untap*
untap-all
untap-all*])
;; Used the generate the import list
#_
(->> (symbol "clojure.core.async")
find-ns
ns-publics
(remove #(:deprecated (meta (val %))))
(clojure.core/map key)
(remove (set (keys (ns-publics *ns*))))
sort)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment