Skip to content

Instantly share code, notes, and snippets.

@martintrojer
Last active March 4, 2017 20:59
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save martintrojer/5943467 to your computer and use it in GitHub Desktop.
Save martintrojer/5943467 to your computer and use it in GitHub Desktop.
core.async blocking IO
(time
(def data-thread
(let [c (chan)
res (atom [])]
;; fetch em all
(doseq [i (range 10 100)]
(thread (>!! c (blocking-get (format "http://fssnip.net/%d" i)))))
;; gather results
(doseq [_ (range 10 100)]
(swap! res conj (<!! c)))
@res
)))
;; "Elapsed time: 6523.782 msecs"
(defn blocking-get [url]
(clj-http.client/get url))
(time
(def data
(let [c (chan)
res (atom [])]
;; fetch em all
(doseq [i (range 10 100)]
(go (>! c (blocking-get (format "http://fssnip.net/%d" i)))))
;; gather results
(doseq [_ (range 10 100)]
(swap! res conj (<!! c)))
@res
)))
;; "Elapsed time: 11123.577 msecs"
;; handle results from the GET
(defn gen-response-handler [f]
(reify ChannelUpstreamHandler
(handleUpstream [_ ctx evt]
(do
(swap! ctr inc)
(when (instance? MessageEvent evt)
(f (.getMessage evt)))))))
;; handle results from the connection (and issue the GET request)
(defn connection-ok [uri f]
(let [host (.getHost uri)]
(reify ChannelFutureListener
(operationComplete [_ fut]
(if (.isSuccess fut)
(let [ch (.getChannel fut)
pipe (.getPipeline ch)
req (DefaultHttpRequest.
(HttpVersion/HTTP_1_1)
(HttpMethod/GET)
(.toASCIIString uri))]
(.setHeader req "Host" host)
(.setHeader req "Connection" "close")
(.addLast pipe "codec" (HttpClientCodec.))
(.addLast pipe "aggregator" (HttpChunkAggregator. 65536))
(.addLast pipe "handler" f)
(.write ch req)
nil))))))
;; Netty uses it's own thread pools
(def bootstrap
(ClientBootstrap.
(NioClientSocketChannelFactory.
(Executors/newCachedThreadPool)
(Executors/newCachedThreadPool))))
(defn async-get [url result]
(org.httpkit.client/get url #(go (>! result %))))
(time
(def hk-data
(let [c (chan)
res (atom [])]
;; fetch em all
(doseq [i (range 10 100)]
(async-get (format "http://fssnip.net/%d" i) c))
;; gather results
(doseq [_ (range 10 100)]
(swap! res conj (<!! c)))
@res
)))
;; "Elapsed time: 6731.781 msecs"
(time
(def data-netty
(let [c (chan)
res (atom [])]
;; fetch them all
(doseq [i (range 10 100)]
(let [name (format "http://fssnip.net/%d" i)
uri (java.net.URI. name)
con-future (.connect bootstrap (java.net.InetSocketAddress. (.getHost uri) 80))]
(.addListener
con-future
(connection-ok uri
(gen-response-handler
(fn [msg]
(go (>! c (.getContent msg)))))))))
;; gather results
(doseq [_ (range 10 100)]
(swap! res conj (<!! c)))
@res
)))
;; "Elapsed time: 6842.132 msecs"
(defproject ca-netty "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[core.async "0.1.0-SNAPSHOT"]
[clj-http "0.7.5"]
[http-kit "2.1.7"]
[io.netty/netty "3.6.6.Final"]])
(ns ca-netty.core
(:use [clojure.core.async])
(:import [org.jboss.netty.channel
ChannelUpstreamHandler MessageEvent ChannelFutureListener]
[org.jboss.netty.handler.codec.http
DefaultHttpRequest HttpVersion HttpMethod HttpClientCodec HttpChunkAggregator]
[org.jboss.netty.bootstrap ClientBootstrap]
[org.jboss.netty.channel.socket.nio NioClientSocketChannelFactory]
[java.util.concurrent Executors]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment