Skip to content

Instantly share code, notes, and snippets.

@ghadishayban
Created March 15, 2015 17:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ghadishayban/30c2327665901ae04f03 to your computer and use it in GitHub Desktop.
Save ghadishayban/30c2327665901ae04f03 to your computer and use it in GitHub Desktop.
Interruptible Channels (Needs JDK8)
(ns poolboy.chanfut
(:import [java.util.concurrent CompletableFuture Callable Future]
[java.util.function BiConsumer]
[java.util.concurrent Executor ExecutorService])
(:require [clojure.core.async :as async]
[clojure.core.async.impl.protocols :as async-impl]))
(def async-executor clojure.core.async.impl.exec.threadpool/the-executor)
(defprotocol Interruptible
(interrupt* [_]))
(deftype InterruptibleChannel [^Future task ch]
async-impl/ReadPort
(take! [_ fn1] (async-impl/take! ch fn1))
async-impl/Channel
(close! [_] (async-impl/close! ch))
(closed? [_] (async-impl/closed? ch))
Interruptible
(interrupt* [_]
;; closing the channel here will prevent the completeable future
(.cancel task true)))
(defn supply
[executor f ch]
(let [cf (CompletableFuture.)
conveyor (reify
BiConsumer
(accept [_ o ex]
(if ex
(async/put! ch ex)
(when (some? o) (async/put! ch o)))
(async/close! ch)))
binds (clojure.lang.Var/getThreadBindingFrame)
task (.submit ^ExecutorService executor
^Callable (fn []
(clojure.lang.Var/resetThreadBindingFrame binds)
(try
(.complete cf (f))
(catch Throwable t
(.completeExceptionally cf t)))))]
(.whenCompleteAsync cf ^BiConsumer conveyor ^Executor async-executor)
(InterruptibleChannel. task ch)))
(defn as-channel*
[executor f]
(let [ch (async/chan 1)]
(supply executor f ch)))
(defmacro as-channel
"Similar to thread but runs in your own executor.
The channel returned will yield the block's return value, or
thrown exception."
[executor & body]
`(as-channel* ~executor (^{:once true} fn* [] ~@body)))
(defn interrupt
"Issues Thread.interrupt() to the task that supplies a channel."
[ch]
(interrupt* ch))
(comment
(def my-executor clojure.lang.Agent/soloExecutor)
(def retc (as-channel my-executor (Thread/sleep 15000)))
;; wait a few
(interrupt retc)
;; should yield an InterruptedException
(async/<!! retc))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment