Skip to content

Instantly share code, notes, and snippets.

@nathanmarz
nathanmarz / late-parameterization-specter.clj
Last active September 11, 2015 22:58
Late bound parameterization with Specter
(def DATA {"a" [{"b" 1} {"b" 2}] "b" 11})
;; Selectors that require parameters can be precompiled *without* the parameters.
;; The parameters are supplied later.
(def path (comp-paths keypath ALL keypath))
;; These two selections are equivalent
(select (path "a" "b") DATA)
(select [(keypath "a") ALL (keypath "b")] DATA)
(defn constructor-num-args [klass]
(let [constrs (.getConstructors klass)]
(set
(for [c constrs]
(count (.getParameterTypes c))
))
))
(defn- mk-fn-inst [klass numargs]
(let [args (vec (for [i (range numargs)] (symbol (str "arg" i))))]
(defmapcatop intensive-op [str]
(c/with-timeout [10000] ; 10 second timeout
[(extract-tuple-using-crazy-regex str)]
))
Tap source = new Hfs(new SequenceFile(new Fields("key", "value")), "/tmp/key-value-pairs");
DomainSpec spec = new DomainSpec(new JavaBerkDB(), 32);
ElephantDBTap sink = new ElephantDBTap("/data/output/my-edb-domain", spec);
Pipe p = new Pipe("pipe");
p = new ElephantTailAssembly(p, sink);
new FlowConnector().connect(source, sink, p).complete();
---
local_persistence: elephantdb.persistence.JavaBerkDB
num_shards: 32
{ :replication 1
:hosts ["edb1.mycompany.com" "edb2.mycompany.com" "edb3.mycompany.com"]
:port 3578
:domains {"tweet-counts" "/data/output/tweet-counts-edb"
"influenced-by" "/data/output/influenced-by-edb"
"influencer-of" "/data/output/influencer-of-edb"
}
}
(with-elephant-connection "edb1.mycompany.com" 3578 handler
(.getString handler "tweet-counts" "http://backtype.com"))
(?-
(elephant-tap "/data/output/my-edb-domain" {:num-shards 32 :persistence-factory (JavaBerkDB.)} {})
(name-vars (hfs-seqfile "/tmp/key-value-pairs") ["?key" "?value"]))
ElephantDBTap source = new ElephantDBTap("/data/output/my-edb-domain");
Pipe p = new Pipe("pipe");
p = new Each(p, new Fields("key", "value"), new ProcessKeyValuePairs(), Fields.RESULTS);
...
(defn process-pid
"Gets the pid of this JVM. Hacky because Java doesn't provide a real way to do this."
[]
(let [name (.getName (ManagementFactory/getRuntimeMXBean))
split (.split name "@")]
(when-not (= 2 (count split))
(throw (RuntimeException. (str "Got unexpected process name: " name))))
(first split)
))