Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created October 3, 2019 10:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mccraigmccraig/7e44ff82d9fb66f6809614f1ed65cd04 to your computer and use it in GitHub Desktop.
Save mccraigmccraig/7e44ff82d9fb66f6809614f1ed65cd04 to your computer and use it in GitHub Desktop.
(ns er-model.connectors.http
(:require
[aleph.http :as http]
[aleph.netty :as netty]
[byte-streams :as bs]
[cats.core :as c :refer [return]]
[cemerick.url :as url]
[cheshire.core :as json]
[clojure.java.io :as io]
[clojure.string :as str]
[er-model.util.mime :as mime]
[manifold.deferred :as d]
[prpr.promise :refer [ddo]]
[prpr.stream :as s])
(:import
io.netty.buffer.ByteBuf
[java.io ByteArrayInputStream ByteArrayOutputStream InputStreamReader]
java.util.zip.GZIPInputStream))
(def ^:private byte-array-type (Class/forName "[B"))
(defn decode-bytes
"given an optionally gzipped byte-array / InputStream of bytes, return
an InputStream of decoded bytes"
[bytes gzip?]
(let [bytes-in (if (instance? byte-array-type bytes)
(ByteArrayInputStream. bytes)
bytes)
in (if gzip? (GZIPInputStream. bytes-in) bytes-in)]
in))
(defn netty-byte-buffer->bytes
"when :raw-stream? is used we are responsible for decrementing
netty's ByteBuf reference-counts
https://github.com/ztellman/aleph/blob/master/src/aleph/http.clj#L113
"
[nbb]
(if (instance? ByteBuf nbb)
;; in order to get ByteBufs :raw-stream? must be given to both
;; the request and the connection-pool - the default connection
;; pool does not return ByteBufs, so a custom connection-pool
;; must be used
(let [bytes (netty/buf->array nbb)]
(.release nbb)
bytes)
;; if :raw-stream? is passed to a request, but not the
;; connection-pool then the connections decode the ByteBufs
;; to byte-arrays, which trivially convert to byte-arrays
(bs/to-byte-array nbb)))
(defn slurp-body-s-bytes
"slurp bytes from a stream<byte[]> or stream<ByteBuf> with optional
gzip encoding into a decoded InputStream"
[body-s gzip?]
(let [ba-out (ByteArrayOutputStream.)
bytes-d (->> body-s
(s/map netty-byte-buffer->bytes)
(s/reduce (fn [out ba]
(.write out ba 0 (count ba))
out)
ba-out))]
(d/chain
bytes-d
#(.toByteArray %)
#(decode-bytes % gzip?))))
(defn unwrap-response
"unwrap a response, collecting any body buffers, removing any gzip
encoding, and returning an InputStream of the body content"
[{response-status :status
response-headers :headers
response-body :body
:as response}]
(ddo [:let [[r-ct
{r-ch :charset}] (mime/parse-content-type
(get response-headers "content-type"))
enc (get response-headers "content-encoding")
gzip? (= enc "gzip")
bytes? (instance? byte-array-type response-body)]
response-in (if bytes?
(return (decode-bytes response-body gzip?))
(slurp-body-s-bytes response-body gzip?))]
(return
{:status response-status
:headers response-headers
:content-type r-ct
:content-charset r-ch
:body response-in})))
(defn decode-string
"given a byte-array, decode a String"
[bytes-in charset]
(let [rdr (InputStreamReader. bytes-in (or charset "UTF-8"))]
(slurp rdr)))
(defn unwrapped-response->text
"given a plain response value, interpret the body as text"
[{r-in :body
r-ch :content-charset
:as response}]
(let [r-body (decode-string r-in r-ch)]
(assoc
response
:body r-body)))
(def ^:private content-type-json "application/json")
(defn unwrapped-response->JSON
"given a plain response value, interpret the body as JSON"
([response] (unwrapped-response->JSON response nil))
([response ct-validator]
(let [{r-ct :content-type
r-body :body
:as tr} (unwrapped-response->text response)
ct-validator (or ct-validator
#(= content-type-json %))]
(if (ct-validator r-ct)
(assoc
tr
:body (json/parse-string r-body true))
(throw
(ex-info "response is not JSON!" tr))))))
(defn response->parsed*
[response response-parser]
(ddo [r (unwrap-response response)]
(return
((or response-parser identity) r))))
(defn response->text*
[response text-parser]
(response->parsed* response (comp (or text-parser identity)
unwrapped-response->text)))
(defn response->JSON*
[response JSON-parser]
(response->parsed* response (comp (or JSON-parser
identity)
unwrapped-response->JSON)))
(defn response->parsed
"unwrap a promise of a response and apply an optional parser
- returns a promise of the parsed response"
([response-d] (response->parsed response-d nil))
([response-d response-parser]
(ddo [response response-d]
(return
(response->parsed* response response-parser)))))
(defn response->text
"unwrap a promise of a response, interpret it as text and apply
an optional parser to the text
- returns a promise of the parsed response"
([response-d] (response->text response-d nil))
([response-d text-parser]
(ddo [response response-d]
(return
(response->text* response text-parser)))))
(defn response->JSON
"unwrap a promise of a response, interpret it as JSON and apply
an optional parser to the JSON
- returns a promise of the parsed response"
([response-d] (response->JSON response-d nil))
([response-d JSON-parser]
(ddo [response response-d]
(return
(response->JSON* response JSON-parser)))))
(defn response->file
"save the body of a response as a file"
([response-d filename] (response->file response-d filename nil))
([response-d filename file-parser]
(ddo [response response-d
{r-ct :content-type
r-body :body
:as response} (unwrap-response response)
:let [out (io/output-stream filename)]]
(bs/transfer r-body out {:append? false})
(return
((or file-parser identity)
(assoc-in
response
[:response :body] (io/file filename)))))))
(defn catch->parsed
"catch an HTTP error and optionally parse the response"
([p] (catch->parsed p nil))
([p response-parser]
(d/catch p
(fn [e]
(if-let [exd (ex-data e)]
(response->parsed* exd response-parser)
e)))))
(defn catch->text
"catch an HTTP error, interpret the response as text and optionally
parse it"
([p] (catch->text p nil))
([p text-parser]
(catch->parsed p (comp text-parser unwrapped-response->text))))
(defn catch->JSON
"catch an HTTP error, intrepret the response as JSON and optionally
parse the JSON"
([p] (catch->JSON p nil))
([p JSON-parser]
(catch->parsed p (comp JSON-parser unwrapped-response->JSON))))
(defn url-encode-params
"url-encode a map of params"
[params]
(->> params
(map (fn [[k v]]
(str (name k) "=" (url/url-encode v))))
(str/join "&")))
(defprotocol IHTTPClient
(-DELETE [_ url opts])
(-PUT [_ url opts])
(-POST [_ url opts])
(-GET [_ url opts]))
(defrecord AlephHTTPClient []
IHTTPClient
(-DELETE [_ url opts]
(http/delete url (assoc opts :raw-stream? true)))
(-PUT [_ url opts]
(http/put url (assoc opts :raw-stream? true)))
(-POST [_ url opts]
(http/post url (assoc opts :raw-stream? true)))
(-GET [_ url opts]
(http/get url (assoc opts :raw-stream? true))))
(defn create-aleph-http-client
[]
(->AlephHTTPClient))
(defn store-method-request
[method-requests-map-atom method url opts]
(update method-requests-map-atom
method
(fnil conj [])
[url opts]))
(defn next-mock-response
[method-responses-map-atom method]
(if-let [r (-> @method-responses-map-atom method first)]
(do
(swap! method-responses-map-atom update method next)
r)
(throw (ex-info (str "no response for: " method) @method-responses-map-atom))))
(defrecord MockHTTPClient [method-requests-map-atom
method-responses-map-atom]
IHTTPClient
(-DELETE [_ url opts]
(store-method-request method-requests-map-atom :DELETE url opts)
(next-mock-response method-responses-map-atom :DELETE))
(-PUT [_ url opts]
(store-method-request method-requests-map-atom :PUT url opts)
(next-mock-response method-responses-map-atom :PUT))
(-POST [_ url opts]
(store-method-request method-requests-map-atom :POST url opts)
(next-mock-response method-responses-map-atom :POST))
(-GET [_ url opts]
(store-method-request method-requests-map-atom :GET url opts)
(next-mock-response method-responses-map-atom :GET)))
(defn create-mock-http-client
[]
(->MockHTTPClient))
(defn create-http-client
[{provider :provider}]
(case provider
:aleph (create-aleph-http-client)
:mock (create-mock-http-client)))
(defn DELETE
"asynchronously DELETE a resource"
([http-client url] (DELETE http-client url nil))
([http-client url opts]
(-DELETE http-client url opts)))
(defn PUT
"asynchronously PUT a resource"
[http-client url opts]
(-PUT http-client url opts))
(defn POST
"asynchronously POST a possibly gzipped resource"
[http-client url opts]
(-POST http-client url opts))
(defn GET
"asynchronously GET a possibly gzipped resource"
([http-client url] (GET http-client url nil))
([http-client url opts]
(-GET http-client url opts)))
(comment
;; a simple example of an HTTP client which will return
;; a variant for both success and error responses, with
;; different failures being represented by different
;; variant tags
(-> (http/GET "http://www.foo.com/api/stuff")
(http/response->JSON
(fn [response] [:ok response]))
(http/catch->JSON
(fn [{status :status
:as response}]
(case status
401 [:unauthenticated response]
404 [:unknown response]
[:error response])))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment