Skip to content

Instantly share code, notes, and snippets.

@cgrand cgrand/foami.clj
Last active Oct 25, 2015

Embed
What would you like to do?
(ns foami.core
"FOreign Asynchronous Mechanism Interop"
(:require [clojure.core.async :as async]))
(def ^:private abandon (doto (async/chan) async/close!))
(def ^:private pending-writes (async/chan))
(defn put!
"Tries to put a val into chan, returns either: true if put succeeded, false if chan is
closed, ::pressurized if the put hasn't been performed yet. No further write should
be attempted before reactivate! is called.
When reactivate! is called either x has been put succesfully or chan is closed.
reactivate! takes 3 arguments: v, chan and x. chan and x are the same as passed to put!
v is the return value as by >!."
[chan x reactivate!]
(let [[v c] (async/alts!! [[chan x] abandon] :priority true)]
(if (identical? abandon c)
(do
(async/>!! pending-writes [chan x reactivate!])
::pressurized)
(boolean v))))
(def ^:private watcher
;; most of this code is dedicated to incrementally updating altsv (rather than recreating it from writes-map each time)
(async/go-loop [writes-map {} altsv [pending-writes]]
(let [[v c] (async/alts! altsv)]
(if (identical? c pending-writes)
(let [[c x reactivate!] v]
(if-let [[n f b] (writes-map c)]
(recur (assoc writes-map c [n f (conj b [x reactivate!])]) altsv)
(recur (assoc writes-map c [(count altsv) (list [x reactivate!]) []])
(conj altsv [c x]))))
(let [[n [[x reactivate!] & f] b] (writes-map c)
f' (if f f (seq b))
b' (if f b [])]
(reactivate! v c x)
(if-let [[[x]] f']
(recur (assoc writes-map c [n f' b']) (assoc altsv n [c x]))
(let [[lastc] (peek altsv)
[lastn f b] (writes-map lastc)
writes-map (assoc writes-map lastc [n f b])
altsv (assoc altsv n lastc)]
(recur (dissoc writes-map c) (pop altsv)))))))))
@cgrand

This comment has been minimized.

Copy link
Owner Author

cgrand commented Oct 10, 2014

for NIO, it means cancelling the SelectionKey upon ::pressurized and, in reactivate!, registering the SelectableChannel again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.