-
-
Save mccraigmccraig/7e44ff82d9fb66f6809614f1ed65cd04 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns er-model.connectors.http | |
(:require | |
[aleph.http :as http] | |
[aleph.netty :as netty] | |
[byte-streams :as bs] | |
[cats.core :as c :refer [return]] | |
[cemerick.url :as url] | |
[cheshire.core :as json] | |
[clojure.java.io :as io] | |
[clojure.string :as str] | |
[er-model.util.mime :as mime] | |
[manifold.deferred :as d] | |
[prpr.promise :refer [ddo]] | |
[prpr.stream :as s]) | |
(:import | |
io.netty.buffer.ByteBuf | |
[java.io ByteArrayInputStream ByteArrayOutputStream InputStreamReader] | |
java.util.zip.GZIPInputStream)) | |
(def ^:private byte-array-type (Class/forName "[B")) | |
(defn decode-bytes | |
"given an optionally gzipped byte-array / InputStream of bytes, return | |
an InputStream of decoded bytes" | |
[bytes gzip?] | |
(let [bytes-in (if (instance? byte-array-type bytes) | |
(ByteArrayInputStream. bytes) | |
bytes) | |
in (if gzip? (GZIPInputStream. bytes-in) bytes-in)] | |
in)) | |
(defn netty-byte-buffer->bytes | |
"when :raw-stream? is used we are responsible for decrementing | |
netty's ByteBuf reference-counts | |
https://github.com/ztellman/aleph/blob/master/src/aleph/http.clj#L113 | |
" | |
[nbb] | |
(if (instance? ByteBuf nbb) | |
;; in order to get ByteBufs :raw-stream? must be given to both | |
;; the request and the connection-pool - the default connection | |
;; pool does not return ByteBufs, so a custom connection-pool | |
;; must be used | |
(let [bytes (netty/buf->array nbb)] | |
(.release nbb) | |
bytes) | |
;; if :raw-stream? is passed to a request, but not the | |
;; connection-pool then the connections decode the ByteBufs | |
;; to byte-arrays, which trivially convert to byte-arrays | |
(bs/to-byte-array nbb))) | |
(defn slurp-body-s-bytes | |
"slurp bytes from a stream<byte[]> or stream<ByteBuf> with optional | |
gzip encoding into a decoded InputStream" | |
[body-s gzip?] | |
(let [ba-out (ByteArrayOutputStream.) | |
bytes-d (->> body-s | |
(s/map netty-byte-buffer->bytes) | |
(s/reduce (fn [out ba] | |
(.write out ba 0 (count ba)) | |
out) | |
ba-out))] | |
(d/chain | |
bytes-d | |
#(.toByteArray %) | |
#(decode-bytes % gzip?)))) | |
(defn unwrap-response | |
"unwrap a response, collecting any body buffers, removing any gzip | |
encoding, and returning an InputStream of the body content" | |
[{response-status :status | |
response-headers :headers | |
response-body :body | |
:as response}] | |
(ddo [:let [[r-ct | |
{r-ch :charset}] (mime/parse-content-type | |
(get response-headers "content-type")) | |
enc (get response-headers "content-encoding") | |
gzip? (= enc "gzip") | |
bytes? (instance? byte-array-type response-body)] | |
response-in (if bytes? | |
(return (decode-bytes response-body gzip?)) | |
(slurp-body-s-bytes response-body gzip?))] | |
(return | |
{:status response-status | |
:headers response-headers | |
:content-type r-ct | |
:content-charset r-ch | |
:body response-in}))) | |
(defn decode-string | |
"given a byte-array, decode a String" | |
[bytes-in charset] | |
(let [rdr (InputStreamReader. bytes-in (or charset "UTF-8"))] | |
(slurp rdr))) | |
(defn unwrapped-response->text | |
"given a plain response value, interpret the body as text" | |
[{r-in :body | |
r-ch :content-charset | |
:as response}] | |
(let [r-body (decode-string r-in r-ch)] | |
(assoc | |
response | |
:body r-body))) | |
(def ^:private content-type-json "application/json") | |
(defn unwrapped-response->JSON | |
"given a plain response value, interpret the body as JSON" | |
([response] (unwrapped-response->JSON response nil)) | |
([response ct-validator] | |
(let [{r-ct :content-type | |
r-body :body | |
:as tr} (unwrapped-response->text response) | |
ct-validator (or ct-validator | |
#(= content-type-json %))] | |
(if (ct-validator r-ct) | |
(assoc | |
tr | |
:body (json/parse-string r-body true)) | |
(throw | |
(ex-info "response is not JSON!" tr)))))) | |
(defn response->parsed* | |
[response response-parser] | |
(ddo [r (unwrap-response response)] | |
(return | |
((or response-parser identity) r)))) | |
(defn response->text* | |
[response text-parser] | |
(response->parsed* response (comp (or text-parser identity) | |
unwrapped-response->text))) | |
(defn response->JSON* | |
[response JSON-parser] | |
(response->parsed* response (comp (or JSON-parser | |
identity) | |
unwrapped-response->JSON))) | |
(defn response->parsed | |
"unwrap a promise of a response and apply an optional parser | |
- returns a promise of the parsed response" | |
([response-d] (response->parsed response-d nil)) | |
([response-d response-parser] | |
(ddo [response response-d] | |
(return | |
(response->parsed* response response-parser))))) | |
(defn response->text | |
"unwrap a promise of a response, interpret it as text and apply | |
an optional parser to the text | |
- returns a promise of the parsed response" | |
([response-d] (response->text response-d nil)) | |
([response-d text-parser] | |
(ddo [response response-d] | |
(return | |
(response->text* response text-parser))))) | |
(defn response->JSON | |
"unwrap a promise of a response, interpret it as JSON and apply | |
an optional parser to the JSON | |
- returns a promise of the parsed response" | |
([response-d] (response->JSON response-d nil)) | |
([response-d JSON-parser] | |
(ddo [response response-d] | |
(return | |
(response->JSON* response JSON-parser))))) | |
(defn response->file | |
"save the body of a response as a file" | |
([response-d filename] (response->file response-d filename nil)) | |
([response-d filename file-parser] | |
(ddo [response response-d | |
{r-ct :content-type | |
r-body :body | |
:as response} (unwrap-response response) | |
:let [out (io/output-stream filename)]] | |
(bs/transfer r-body out {:append? false}) | |
(return | |
((or file-parser identity) | |
(assoc-in | |
response | |
[:response :body] (io/file filename))))))) | |
(defn catch->parsed | |
"catch an HTTP error and optionally parse the response" | |
([p] (catch->parsed p nil)) | |
([p response-parser] | |
(d/catch p | |
(fn [e] | |
(if-let [exd (ex-data e)] | |
(response->parsed* exd response-parser) | |
e))))) | |
(defn catch->text | |
"catch an HTTP error, interpret the response as text and optionally | |
parse it" | |
([p] (catch->text p nil)) | |
([p text-parser] | |
(catch->parsed p (comp text-parser unwrapped-response->text)))) | |
(defn catch->JSON | |
"catch an HTTP error, intrepret the response as JSON and optionally | |
parse the JSON" | |
([p] (catch->JSON p nil)) | |
([p JSON-parser] | |
(catch->parsed p (comp JSON-parser unwrapped-response->JSON)))) | |
(defn url-encode-params | |
"url-encode a map of params" | |
[params] | |
(->> params | |
(map (fn [[k v]] | |
(str (name k) "=" (url/url-encode v)))) | |
(str/join "&"))) | |
(defprotocol IHTTPClient | |
(-DELETE [_ url opts]) | |
(-PUT [_ url opts]) | |
(-POST [_ url opts]) | |
(-GET [_ url opts])) | |
(defrecord AlephHTTPClient [] | |
IHTTPClient | |
(-DELETE [_ url opts] | |
(http/delete url (assoc opts :raw-stream? true))) | |
(-PUT [_ url opts] | |
(http/put url (assoc opts :raw-stream? true))) | |
(-POST [_ url opts] | |
(http/post url (assoc opts :raw-stream? true))) | |
(-GET [_ url opts] | |
(http/get url (assoc opts :raw-stream? true)))) | |
(defn create-aleph-http-client | |
[] | |
(->AlephHTTPClient)) | |
(defn store-method-request | |
[method-requests-map-atom method url opts] | |
(update method-requests-map-atom | |
method | |
(fnil conj []) | |
[url opts])) | |
(defn next-mock-response | |
[method-responses-map-atom method] | |
(if-let [r (-> @method-responses-map-atom method first)] | |
(do | |
(swap! method-responses-map-atom update method next) | |
r) | |
(throw (ex-info (str "no response for: " method) @method-responses-map-atom)))) | |
(defrecord MockHTTPClient [method-requests-map-atom | |
method-responses-map-atom] | |
IHTTPClient | |
(-DELETE [_ url opts] | |
(store-method-request method-requests-map-atom :DELETE url opts) | |
(next-mock-response method-responses-map-atom :DELETE)) | |
(-PUT [_ url opts] | |
(store-method-request method-requests-map-atom :PUT url opts) | |
(next-mock-response method-responses-map-atom :PUT)) | |
(-POST [_ url opts] | |
(store-method-request method-requests-map-atom :POST url opts) | |
(next-mock-response method-responses-map-atom :POST)) | |
(-GET [_ url opts] | |
(store-method-request method-requests-map-atom :GET url opts) | |
(next-mock-response method-responses-map-atom :GET))) | |
(defn create-mock-http-client | |
[] | |
(->MockHTTPClient)) | |
(defn create-http-client | |
[{provider :provider}] | |
(case provider | |
:aleph (create-aleph-http-client) | |
:mock (create-mock-http-client))) | |
(defn DELETE | |
"asynchronously DELETE a resource" | |
([http-client url] (DELETE http-client url nil)) | |
([http-client url opts] | |
(-DELETE http-client url opts))) | |
(defn PUT | |
"asynchronously PUT a resource" | |
[http-client url opts] | |
(-PUT http-client url opts)) | |
(defn POST | |
"asynchronously POST a possibly gzipped resource" | |
[http-client url opts] | |
(-POST http-client url opts)) | |
(defn GET | |
"asynchronously GET a possibly gzipped resource" | |
([http-client url] (GET http-client url nil)) | |
([http-client url opts] | |
(-GET http-client url opts))) | |
(comment | |
;; a simple example of an HTTP client which will return | |
;; a variant for both success and error responses, with | |
;; different failures being represented by different | |
;; variant tags | |
(-> (http/GET "http://www.foo.com/api/stuff") | |
(http/response->JSON | |
(fn [response] [:ok response])) | |
(http/catch->JSON | |
(fn [{status :status | |
:as response}] | |
(case status | |
401 [:unauthenticated response] | |
404 [:unknown response] | |
[:error response]))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment