Skip to content

Instantly share code, notes, and snippets.

@bcachet
Created June 12, 2018 06:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bcachet/e39871b3c33fe8fd0cea23af383a6648 to your computer and use it in GitHub Desktop.
Save bcachet/e39871b3c33fe8fd0cea23af383a6648 to your computer and use it in GitHub Desktop.
(ns play-with-manifold
(:gen-class)
(:require
[aleph.http :as http]
[manifold.stream :as s]
[manifold.bus :as b]
[manifold.deferred :as d]
[byte-streams :as bs]
[cheshire.core :as json])
(:import io.netty.handler.codec.http.HttpContentDecompressor
io.netty.channel.ChannelPipeline))
(def json-parse-stream
(comp #(json/parse-stream % true) bs/to-reader))
(defn add-decompressor [^ChannelPipeline p]
(.addAfter ^ChannelPipeline p "http-client" "decompressor" (HttpContentDecompressor.)))
(def default-pool-with-decompressor (http/connection-pool {:connection-options
{:pipeline-transform add-decompressor}}))
(defn http-get
([url options]
(http/get url (merge {:pool default-pool-with-decompressor} options)))
([url]
(http-get url {})))
(comment
(def bus (b/event-bus))
(def headers (b/subscribe bus :response))
(def body (b/subscribe bus :response))
(b/downstream bus :response)
(b/publish! bus :response (http-get "https://api.github.com/users/bcachet/keys" {:headers {:user-agent "go-http-client 1.0" :accept-encoding "gzip"}}))
@(d/chain (s/take! headers)
:headers)
@(d/chain (s/take! body)
:body
json-parse-stream))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment