Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created May 23, 2017 17:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mccraigmccraig/952ad298d47f93416eac23d7cfd0884d to your computer and use it in GitHub Desktop.
Save mccraigmccraig/952ad298d47f93416eac23d7cfd0884d to your computer and use it in GitHub Desktop.
(ns er-model.connectors.s3
(:require
[plumbing.core :refer :all]
[taoensso.timbre :refer [trace debug info warn error]]
[cats.core :refer [mlet return mapseq] :as c]
[cats.context :refer [with-context]]
[cats.labs.manifold :refer [deferred-context]]
[amazonica.aws.s3 :as aws-s3]
[er-model.util.mime :as mime]
[manifold.deferred :as d])
(:import
[java.util.zip GZIPInputStream]))
(defprotocol IS3Object
(bucket [_])
(path [_]))
(defrecord S3Object [bucket* path*]
IS3Object
(bucket [_] bucket*)
(path [_] path*))
(def s3-obj-pattern
#"^s3://([^/]+)/(.+)$")
(defn create-s3-object
([url-or-s3-object]
(if (instance? S3Object url-or-s3-object)
url-or-s3-object
(if-let [[_ bucket* path*] (re-matches s3-obj-pattern (str url-or-s3-object))]
(map->S3Object {:bucket* bucket*
:path* path*})
(throw
(ex-info "url must be s3://bucket/path/components"
{:url url-or-s3-object})))))
([bucket* path*]
(map->S3Object {:bucket* bucket*
:path* path*})))
;; get an object from S3 returning a Deferred<response>
(defprotocol IS3Client
(get-object
[_ ^S3Object s3-object]
[_ bucket* obj-key*]))
(defrecord S3Client [credentials]
IS3Client
(get-object [this url-or-s3-object]
(if (instance? S3Object url-or-s3-object)
(d/future ;; clj-aws is not async
(aws-s3/get-object
credentials
:bucket-name (bucket url-or-s3-object)
:key (path url-or-s3-object)))
(get-object this (create-s3-object url-or-s3-object))))
(get-object [this bucket* obj-key*]
(get-object this (create-s3-object bucket* obj-key*))))
(defn create-s3-client
[credentials]
(map->S3Client {:credentials credentials}))
(defn s3-slurp
"async slurp from an S3 object, correctly disposing of resources afterwards.
return s a Deferred<String>"
([s3-client url-or-s3-object]
(with-context deferred-context
(mlet [{content-in :object-content
{enc :content-encoding
ct :content-type} :object-metadata
:as response} (get-object s3-client url-or-s3-object)
:let [_ (clojure.pprint/pprint response)
[content-type {ct-charset :charset}] (mime/parse-content-type ct)]]
(return
(with-open [in (if (= "gzip" enc)
(GZIPInputStream. content-in)
content-in)]
(slurp in :encoding (or ct-charset "UTF-8")))))))
([s3-client bucket obj-or-key]
(s3-slurp (create-s3-object bucket obj-or-key))))
(defn s3-spit
"async spit a file to an S3 object. Returs a Deferred<PutObjectResult>.
- `metadata` - as for aws.sdk.s3/put-object"
([s3-client url-or-s3-object f metadata]
(with-context deferred-context
(mlet [:let [s3-obj (create-s3-object url-or-s3-object)]
put-result (d/future
(aws-s3/put-object
(:credentials s3-client)
:bucket-name (:bucket* s3-obj)
:key (:path* s3-obj)
:file f
:metadata (merge
metadata
{:server-side-encryption "AES256"})))]
(return put-result))))
([s3-client bucket obj-or-key value metadata]
(s3-spit s3-client (create-s3-object bucket obj-or-key) value metadata)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment