Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
(ns gist.globhfs
(:import [cascading.tap GlobHfs]))
;; ### Bucket to Cluster
;;; To get tuples back out of our directory structure on S3, we employ
;; Cascading's [GlobHFS] ( tap, along with an
;; interface tailored for datasets stored in the MODIS sinusoidal
;; projection. For details on the globbing syntax, see
;; [here](
(defn globstring
"Takes a path ending in `/` and collections of datasets,
resolutions, and tiles, and returns a globstring formatted for
cascading's GlobHFS. (`*` may be substituted in for any argument but
Example Usage:
(globstring \"s3://bucket/\" [\"ndvi\" \"evi\"] [\"1000-32\"] *)
;=> \"s3://bucket/{ndvi,evi}/{1000-32}/*/*/\"
(globstring \"s3://bucket/\" * * [\"008006\" \"033011\"])
;=> \"s3://bucket/*/*/{008006,033011}/*/\""
([basepath datasets resolutions tiles]
(globstring basepath datasets resolutions tiles *))
([basepath datasets resolutions tiles batches]
(letfn [(wrap [coll]
(format "{%s}/"
(apply str (interpose "," coll))))
(bracketize [arg]
(if (= * arg) "*/" (wrap arg)))]
(apply str
(map bracketize
[datasets resolutions tiles batches])))))
(defn globhfs-seqfile
(GlobHfs. (w/sequence-file Fields/ALL) pattern))
(defn read-test
"Takes in a path and a number of pieces, and performs a test
operation on all tuples matching the glob."
[path & pieces]
(let [source (globhfs-seqfile (apply globstring path pieces))]
(?<- (stdout)
[?dataset ?tilestring ?date ?count]
(source ?dataset ?s-res ?t-res ?tilestring ?date ?chunkid ?chunk)
(c/count ?count))))
;; Example usage, for my particular application:
;; (globstring "s3://bucket/" ["ndvi" "evi"] ["1000-32"] *)
;; => "s3://bucket/{ndvi,evi}/{1000-32}/*/*"
;; so, the last method is called with
;; (read-test "s3://bucket/" ["ndvi" "evi"] ["1000-32"] *)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment