Skip to content

Instantly share code, notes, and snippets.

@jaybutera
Created January 5, 2021 23:10
Show Gist options
  • Save jaybutera/dabbcda1e89c8ffed33773d73b420907 to your computer and use it in GitHub Desktop.
Save jaybutera/dabbcda1e89c8ffed33773d73b420907 to your computer and use it in GitHub Desktop.
clojure channel callbacks
(ns chan-test.core
(:require [clojure.core.async :as a :refer [>! <! go go-loop]]))
(def send-chan (a/chan 10))
(def recv-chan (a/chan 10))
(def callback-chan (a/chan 10))
(defn load-foo
"Load corresponding keys of the values."
[req nonce]
(let [cb (a/chan)]
(go
; Pass the return channel to be matched later
(>! callback-chan {:id nonce :chan cb})
; Request a response for the message
(>! send-chan {:id nonce :req req})
(<! cb))))
(defn munch
"Receive msg on callback channel, and if its id matches the provided msg,
put the msg on the return channel. Otherwise put the message back
on the callback channel and repeat."
[msg]
(go-loop [cb-msg (<! callback-chan)]
; If message ids match
(if (= (:id cb-msg) (:id msg))
; Put the msg on the return chan and repeat
(>! (:chan cb-msg) msg)
; Otherwise put the msg back on the callback channel and start over
(do
(>! callback-chan cb-msg)
(defn nom
"Look for matching messages on the callback and recv channels.
When they match, put the recv'd message on the callback channel.
Shouldn't deadlock bcs a msg on recv-chan should always mean there
is a corresponding msg on callback-chan."
[]
(go-loop [msg (<! recv-chan)]
(<! (munch msg))
(recur (<! recv-chan))))
(defn server
"Listens for request messages and responds."
[res]
(go-loop []
(let [req (<! send-chan)]
; Response has same id as request
(>! recv-chan {:id (:id req) :res res}))
(recur)))
(defn -main
"Program entry point."
[& args]
; Start munching messages on channels
(nom)
; Listener to respond to requests
(server "back a'tcha")
; Send 10 requests and print the responses
(a/<!! (go (doseq [nonce (range 10)]
(let [x (load-foo "heyo" nonce)]
(println "for nonce" nonce (<! x)))))))
@didibus
Copy link

didibus commented Jan 8, 2021

Hum, if the server was actually remote, your PING request to it would be an HTTP request, which is a synchronous protocol. There is no such thing as asynchronous HTTP. An HTTP request creates an HTTP connection and sends a request over it, then waits for a response before sending another request. To send multiple HTTP requests in parallel, you need to create multiple HTTP connections.

The best thing to "async" HTTP is called HTTP pipelining. What it does is open a connection, sends multiple requests and then waits for their responses, but the responses are going to be guaranteed in the same order as the requests were made, so there's no need for id mapping or anything like that to figure out which response is for which request.

So when you make an HTTP request to a remote server, the response will be mapped to the request, no need for ids.

Now, what is going to be async (if you want to make async requests) isn't the HTTP request, but the use of the HTTP client to make the request. So you would used an async HTTP client, such as the Java HTTP client that's included in JDK 11+. This client would return a CompletableFuture with the response.

(import '[java.net.http HttpClient HttpRequest HttpResponse$BodyHandlers])
(import '[java.net URI])
(import '[java.util.function Function])

(defn async-request
  [url]
  (let [uri (URI. url)
        http-client (HttpClient/newHttpClient)
        request (-> (HttpRequest/newBuilder)
                    (.uri uri)
                    (.build))]
    (-> http-client
        (.sendAsync request (HttpResponse$BodyHandlers/ofString))
        (.thenApply (reify Function
                      (apply [_ x] (.body x)))))))

@(async-request "http://www.foo.com/ping")

So what happens here is that the HTTP client will create thread(s) as needed, establish an HTTP connection, make a get request to it, wait for the response on your behalf, and when the response is received, it'll send it to the future.

Different HTTP clients will handle the async differently, the Java 11+ one returns a CompletableFuture. There might be some Clojure based one which return a core.async channel. Or you can take from the CompletableFuture and put in on a channel yourself if you want.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment