-
-
Save mccraigmccraig/952ad298d47f93416eac23d7cfd0884d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns er-model.connectors.s3 | |
(:require | |
[plumbing.core :refer :all] | |
[taoensso.timbre :refer [trace debug info warn error]] | |
[cats.core :refer [mlet return mapseq] :as c] | |
[cats.context :refer [with-context]] | |
[cats.labs.manifold :refer [deferred-context]] | |
[amazonica.aws.s3 :as aws-s3] | |
[er-model.util.mime :as mime] | |
[manifold.deferred :as d]) | |
(:import | |
[java.util.zip GZIPInputStream])) | |
(defprotocol IS3Object | |
(bucket [_]) | |
(path [_])) | |
(defrecord S3Object [bucket* path*] | |
IS3Object | |
(bucket [_] bucket*) | |
(path [_] path*)) | |
(def s3-obj-pattern | |
#"^s3://([^/]+)/(.+)$") | |
(defn create-s3-object | |
([url-or-s3-object] | |
(if (instance? S3Object url-or-s3-object) | |
url-or-s3-object | |
(if-let [[_ bucket* path*] (re-matches s3-obj-pattern (str url-or-s3-object))] | |
(map->S3Object {:bucket* bucket* | |
:path* path*}) | |
(throw | |
(ex-info "url must be s3://bucket/path/components" | |
{:url url-or-s3-object}))))) | |
([bucket* path*] | |
(map->S3Object {:bucket* bucket* | |
:path* path*}))) | |
;; get an object from S3 returning a Deferred<response> | |
(defprotocol IS3Client | |
(get-object | |
[_ ^S3Object s3-object] | |
[_ bucket* obj-key*])) | |
(defrecord S3Client [credentials] | |
IS3Client | |
(get-object [this url-or-s3-object] | |
(if (instance? S3Object url-or-s3-object) | |
(d/future ;; clj-aws is not async | |
(aws-s3/get-object | |
credentials | |
:bucket-name (bucket url-or-s3-object) | |
:key (path url-or-s3-object))) | |
(get-object this (create-s3-object url-or-s3-object)))) | |
(get-object [this bucket* obj-key*] | |
(get-object this (create-s3-object bucket* obj-key*)))) | |
(defn create-s3-client | |
[credentials] | |
(map->S3Client {:credentials credentials})) | |
(defn s3-slurp | |
"async slurp from an S3 object, correctly disposing of resources afterwards. | |
return s a Deferred<String>" | |
([s3-client url-or-s3-object] | |
(with-context deferred-context | |
(mlet [{content-in :object-content | |
{enc :content-encoding | |
ct :content-type} :object-metadata | |
:as response} (get-object s3-client url-or-s3-object) | |
:let [_ (clojure.pprint/pprint response) | |
[content-type {ct-charset :charset}] (mime/parse-content-type ct)]] | |
(return | |
(with-open [in (if (= "gzip" enc) | |
(GZIPInputStream. content-in) | |
content-in)] | |
(slurp in :encoding (or ct-charset "UTF-8"))))))) | |
([s3-client bucket obj-or-key] | |
(s3-slurp (create-s3-object bucket obj-or-key)))) | |
(defn s3-spit | |
"async spit a file to an S3 object. Returs a Deferred<PutObjectResult>. | |
- `metadata` - as for aws.sdk.s3/put-object" | |
([s3-client url-or-s3-object f metadata] | |
(with-context deferred-context | |
(mlet [:let [s3-obj (create-s3-object url-or-s3-object)] | |
put-result (d/future | |
(aws-s3/put-object | |
(:credentials s3-client) | |
:bucket-name (:bucket* s3-obj) | |
:key (:path* s3-obj) | |
:file f | |
:metadata (merge | |
metadata | |
{:server-side-encryption "AES256"})))] | |
(return put-result)))) | |
([s3-client bucket obj-or-key value metadata] | |
(s3-spit s3-client (create-s3-object bucket obj-or-key) value metadata))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment