Skip to content

Instantly share code, notes, and snippets.

@schaueho
Last active August 29, 2015 14:23
Show Gist options
  • Save schaueho/5726a96641693dce3e47 to your computer and use it in GitHub Desktop.
Save schaueho/5726a96641693dce3e47 to your computer and use it in GitHub Desktop.
Some notes on core.async channels

Notes on core.async usage:

go and thread will return a channel with buffer-size of zero containing the result of the body. That's exploited in the following snippet from the core.async walkthrough, which more or less "tests" (asserts) that the value "hello" is successfully communicated over the channel c.

(let [c (chan)]
	 (go (>! c "hello"))
	 (assert (= "hello" (<!! (go (<! c)))))
	 (async/close! c))

Note that the assert is neither within the go block nor directly outside go, because as outlined above, the result will be another channel. So the assertion checks the result of the blocking take !<< on the result channel. This is a useful pattern for testing core.async code: return the result channel from your taking code and run the test with a blocking take on it, like this:

(defn my-async-code [v]
    (let [c (chan)]
 (go (>! c v))
 (go (<! c))))

(fact "Sending a value over a channel"
(<!! (my-async-code "hello")) => "hello")

Channels can be explicitly closed as in the example above or implicitly via garbage collection. When a channel is closed, any taking attempt from the channel will receive nil.

This, however, does not imply that a channel will be closed or taking attempts will yield nil only because your code has put all values onto the channel. Consider the following slight modification:

(let [c (chan)]
 (go (>! c "hello"))
 (assert (= "hello"
	(<!! (go-loop [doc (<! c)]
	         (println doc)
		 (if-let [ndoc (<! c)]
 		   (recur ndoc)
		   doc)))))
     (async/close! c))

This will print "hello" to stdout as you would expect but then wait forever for the next value to arrive on c. Take a look at another example from the core.async walkthrough:

(let [c1 (chan)
      c2 (chan)]
   (thread (while true
	     (let [[v ch] (alts!! [c1 c2])]
		 (println "Read" v "from" ch))))
   (>!! c1 "hi")
   (>!! c2 "there"))

This simply sets up two channels, starts a thread reading indefinitely from them, and finally puts two values on the channels. Basically, the thread will run forever and the channels c1 and c2 will never be closed nor garbage collected. One could come up with the idea that you could close the channel after putting all values on it, but this doesn't work, because the channel is then already closed before you start taking.

So, what does this amount to? Either you're taking a known number of values from a channel or you're taking values forever, never closing the channel. The latter would be appropriate for long living producer-consumer workflows. However, if looping forever is what you want to do, you'll have a hard time testing your code (which is probably the reason why the linked article benchmarks the time outside of the Clojure process). This is because the test approaches typically look like the following piece of code. The idea here is that the channel should produce a value within a given amount of time or the test will fail.

    ;; originally by Leon Grapenthin, taken from
;; http://stackoverflow.com/a/30781278/3098550
;; https://stackoverflow.com/questions/30766215/how-do-i-unit-test-clojure-core-async-go-macros
(defn test-within
  "Asserts that ch does not close or produce a value within ms. Returns a
 channel from which the value can be taken."
 [ms inputchan]
     (<!!
	(go (let [timechan (timeout ms)
                      [value rchannel] (alts! [inputchan timechan])]
                   (if (= rchannel timechan)
                       (str "Test should have finished within " ms "ms.")
                       value)))))

  (defn my-async-code [v]
   (let [c (chan)]
	(go (>! c v))
	(go-loop [doc (<! c)]
		 (if-let [ndoc (<! c)]
			 (recur ndoc)
			 doc))))

  (fact "This is a test"
  (test-within 100
       (my-async-code "hello") => "hello")) ;; nope, "Test should have finished within 100ms."

The idea behind test-within is basically the same pattern as we saw in the first code snippet: you do a blocking take <!! on the value from the input channel or from the timeout channel. And of course, this will never result in a value when you're looping endlessly.

One way to get around this is to return the channel you explicitly open and the result channel, then close the channel and take from the result channel like this:

(let [c1 (chan)
          c2 (chan)]
      (go 
        (>!! c1 "hi")
        (>!! c2 "there"))
      [c1 c2 (async/thread
                (loop [[v ch] (alts!! [c1 c2])]
                   (println "Read" v "from" ch)
                   (when-let [[nv nch] (alts!! [c1 c2])]
                    (if nv
                      (recur [nv nch])
                     :done))))])
=> [#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@136535df>]
Read hi from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def>
Read there from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e>

(let [[c1 c2 resultchan] *1]
	(close! c1)
	(<!! resultchan))
=>:done

However, this is extremely ugly and will leak implemenation details (the channels c1 and c2) to the outside.

The other idea would be to send along a special "end-of-communication" value which is then treated accordingly by the receiving thread:

(let [c1 (chan)]
     (go 
        (>! c1 "hi")
        (>! c1 :EOC))
     (async/thread
	    (loop [v (<!! c1)]
	        (println "Read" v)
			(when-let [nv (<!! c1)]
			(if (= nv :EOC)
		        (do (close! c1)
		            :done)
		        (recur nv)))))

=>#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@112bb42e>
Read hi
(<!! *1)
=>:done

This works as expected but also has multiple drawbacks: first is the use of "magic values", but this is only minor. The second is so critical that we can rule out this option for the majority of cases: using go or thread in the putting code implies that you can't enforce the order in which values will be put onto the channel. In other words: you can't know on the receiving side that :EOC has been send after you've received all other values that the sending side wanted to put onto the channel. It works in this simple example where we simply put values on the channel, but will not work reliably when computation happens inside the thread started by go or thread.

The next idea here is to agree on a common amount of values to process, like this:

(let [c1 (chan)
          values ["hi" "there"]
      vcount (count values)]
     (doseq [value values]
	 (thread
	   (>!! c1 value)))
     (thread
	 (loop [recvalue (<!! c1)
                reccount 1]
	     (println "Read" recvalue)
	     (if (= reccount vcount)
		 (do (close! c1)
		     :done)
		 (recur (<!! c1) (inc reccount))))))

This has two obvious drawbacks: obviously, you have to agree up-front on the shared amount of values to process. And you cannot deviate: if e.g. an exception occurs in the sending code, your receiving side will still wait forever.

So, my final take on this is a pretty obvious one: listen on a timeout channel while waiting for values to arrive.

(let [c1 (chan)
      tchan (timeout 1000) 
          values ["hi" "there"]]
   (doseq [value values]
	 (thread
	   (>!! c1 value)))
   (thread
     (loop [[recvalue rchan] (alts!! [c1 tchan])
	    timeoutchan tchan]
	 (if (= rchan timeoutchan)
	    (do (close! c1)
	        :done)
	    (do (println "Read" recvalue)
		(let [newtimeout (timeout 1000)]
	 	     (recur (alts!! [c1 newtimeout])
			    newtimeout))))))

Error handling in core.async is not directly obvious. See the following example, taken from Wil Yegelwel's blog post on Error handling with core.async:

;; Clojure 1.6.0 and above, this exception stack trace prints to stderr
;; Less than Clojure 1.6.0, exception is dropped, and nil is returned
(<!! (go (throw (Exception.))))

;; You will never see "Exception caught" in stdout
(try
   (go (throw (Exception.)))
   (catch Exception e 
	 (println "Exception caught")))

You can't catch the thrown exception because go uses a thread pool behind the scenes (thread creates a new thread, too, of course). So basically, the thrown exception is around in a different thread, i.e. the thread calling out to try / catch will never see the exception. Wil's post discusses several options, in particular pointing to using an additional <? function or macro (cf. Martin Trojer's article, which will handle the exception correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment