Skip to content

Instantly share code, notes, and snippets.

@jaybutera
Created January 5, 2021 23:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaybutera/dabbcda1e89c8ffed33773d73b420907 to your computer and use it in GitHub Desktop.
Save jaybutera/dabbcda1e89c8ffed33773d73b420907 to your computer and use it in GitHub Desktop.
clojure channel callbacks
(ns chan-test.core
(:require [clojure.core.async :as a :refer [>! <! go go-loop]]))
(def send-chan (a/chan 10))
(def recv-chan (a/chan 10))
(def callback-chan (a/chan 10))
(defn load-foo
"Load corresponding keys of the values."
[req nonce]
(let [cb (a/chan)]
(go
; Pass the return channel to be matched later
(>! callback-chan {:id nonce :chan cb})
; Request a response for the message
(>! send-chan {:id nonce :req req})
(<! cb))))
(defn munch
"Receive msg on callback channel, and if its id matches the provided msg,
put the msg on the return channel. Otherwise put the message back
on the callback channel and repeat."
[msg]
(go-loop [cb-msg (<! callback-chan)]
; If message ids match
(if (= (:id cb-msg) (:id msg))
; Put the msg on the return chan and repeat
(>! (:chan cb-msg) msg)
; Otherwise put the msg back on the callback channel and start over
(do
(>! callback-chan cb-msg)
(defn nom
"Look for matching messages on the callback and recv channels.
When they match, put the recv'd message on the callback channel.
Shouldn't deadlock bcs a msg on recv-chan should always mean there
is a corresponding msg on callback-chan."
[]
(go-loop [msg (<! recv-chan)]
(<! (munch msg))
(recur (<! recv-chan))))
(defn server
"Listens for request messages and responds."
[res]
(go-loop []
(let [req (<! send-chan)]
; Response has same id as request
(>! recv-chan {:id (:id req) :res res}))
(recur)))
(defn -main
"Program entry point."
[& args]
; Start munching messages on channels
(nom)
; Listener to respond to requests
(server "back a'tcha")
; Send 10 requests and print the responses
(a/<!! (go (doseq [nonce (range 10)]
(let [x (load-foo "heyo" nonce)]
(println "for nonce" nonce (<! x)))))))
@didibus
Copy link

didibus commented Jan 6, 2021

My Feedback:

  • Probably because this is a gist, but normally the filename and the namespace name must match with the - replaced by _, so since your namespace name is chan-test.core, your filename needs to be core.clj and should be inside a folder named chan_test like src/chan_test/core.clj

On that topic, the core naming used to be pretty idiomatic, but now it is more encouraged to just name your core namespace the name of your app or library itself. So in your case I would just do:

(ns chan-test
  (:require [clojure.core.async :as a :refer [>! <! go go-loop]]))

And put that in a file: src/chan_test.clj

  • [minor] It's common that comments on their own line like ; Start munching messages on channels start with double colon like ;; Start munching messages on channels, where as single colon comments are for comments at the end of a line
  • The use of core.async definitely confuses me here. You have a lot of side effect going on, honestly I'm not even able to understand what you're trying to do. You map responses with requests and callbacks using an id and checking for equality, and that just feels weird as well. I'd expect that you make a request and when it completes it simply puts the result back on a channel, without having this weird id matching logic.

@jaybutera
Copy link
Author

@didibus sorry I should have clarified the meaning better. In this scenario, the server function would theoretically be a separate program, communicating over sockets. I figured I can't pass a channel over the wire, so I need a way to match response messages with requests. The send-chan and recv-chan may be a websocket connection. I can't see a better way to do this, so basically just matching ids of messages to match a response to its request. But this whole pattern does feel like overkill.. so maybe there's a better way?

@didibus
Copy link

didibus commented Jan 8, 2021

Hum, if the server was actually remote, your PING request to it would be an HTTP request, which is a synchronous protocol. There is no such thing as asynchronous HTTP. An HTTP request creates an HTTP connection and sends a request over it, then waits for a response before sending another request. To send multiple HTTP requests in parallel, you need to create multiple HTTP connections.

The best thing to "async" HTTP is called HTTP pipelining. What it does is open a connection, sends multiple requests and then waits for their responses, but the responses are going to be guaranteed in the same order as the requests were made, so there's no need for id mapping or anything like that to figure out which response is for which request.

So when you make an HTTP request to a remote server, the response will be mapped to the request, no need for ids.

Now, what is going to be async (if you want to make async requests) isn't the HTTP request, but the use of the HTTP client to make the request. So you would used an async HTTP client, such as the Java HTTP client that's included in JDK 11+. This client would return a CompletableFuture with the response.

(import '[java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers])
(import '[java.net URI])
(import '[java.util.function Function])

(defn async-request
  [url]
  (let [uri (URI. url)
        http-client (HttpClient/newHttpClient)
        request (-> (HttpRequest/newBuilder)
                    (.uri uri)
                    (.build))]
    (-> http-client
        (.sendAsync request (HttpResponse$BodyHandlers/ofString))
        (.thenApply (reify Function
                      (apply [_ x] (.body x)))))))

@(async-request "http://www.foo.com/ping")

So what happens here is that the HTTP client will create thread(s) as needed, establish an HTTP connection, make a get request to it, wait for the response on your behalf, and when the response is received, it'll send it to the future.

Different HTTP clients will handle the async differently, the Java 11+ one returns a CompletableFuture. There might be some Clojure based one which return a core.async channel. Or you can take from the CompletableFuture and put in on a channel yourself if you want.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment