Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sorenmacbeth/1956644 to your computer and use it in GitHub Desktop.
Save sorenmacbeth/1956644 to your computer and use it in GitHub Desktop.
(ns forma.hadoop.pail
(:use cascalog.api
[cascalog.io :only (with-fs-tmp)]
[forma.reproject :only (hv->tilestring)])
(:import [forma.schema DataChunk FireTuple FormaValue
LocationProperty LocationPropertyValue
ModisPixelLocation DataValue]
[backtype.cascading.tap PailTap PailTap$PailTapOptions]
[backtype.hadoop.pail Pail]))
;; ## Pail Data Structures
(gen-class :name forma.hadoop.pail.DataChunkPailStructure
:extends forma.tap.ThriftPailStructure
:prefix "pail-")
(defn pail-getType [this] DataChunk)
(defn pail-createThriftObject [this] (DataChunk.))
(gen-class :name forma.hadoop.pail.SplitDataChunkPailStructure
:extends forma.hadoop.pail.DataChunkPailStructure
:prefix "split-")
(defn split-getTarget [this ^DataChunk d]
(let [location (-> d .getLocationProperty .getProperty .getFieldValue)
tilestring (hv->tilestring (.getTileH location) (.getTileV location))
res (format "%s-%s"
(.getResolution location)
(.getTemporalRes d))]
[(.getDataset d) res tilestring]))
(defn split-isValidTarget [this dirs]
(boolean (#{3 4} (count dirs))))
;; ## Pail Taps
(defn- pail-tap
[path colls structure]
(let [seqs (into-array java.util.List colls)
spec (PailTap/makeSpec nil structure)
opts (PailTap$PailTapOptions. spec "datachunk" seqs nil)]
(PailTap. path opts)))
(defn data-chunk-tap [path & colls]
(pail-tap path colls (forma.hadoop.pail.DataChunkPailStructure.)))
(defn split-chunk-tap [path & colls]
(pail-tap path colls (forma.hadoop.pail.SplitDataChunkPailStructure.)))
(defn ?pail-*
"Executes the supplied query into the pail located at the supplied
path, consolidating when finished."
[tap pail-path query]
(let [pail (Pail. pail-path)]
(with-fs-tmp [_ tmp]
(?- (tap tmp) query)
(.absorb pail (Pail. tmp)))))
(defmacro ?pail-
"Executes the supplied query into the pail located at the supplied
path, consolidating when finished."
[[tap path] query]
(list `?pail-* tap path query))
(defn to-pail
"Executes the supplied `query` into the pail at `pail-path`. This
pail must make use of the `SplitDataChunkPailStructure`."
[pail-path query]
(?pail- (split-chunk-tap pail-path)
query))
(defmain consolidate [pail-path]
(.consolidate (Pail. pail-path)))
(defmain absorb [from-pail to-pail]
(.absorb (Pail. to-pail) (Pail. from-pail)))
package forma.tap;
import backtype.hadoop.pail.PailStructure;
import java.util.Collections;
import java.util.List;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
public abstract class ThriftPailStructure<T extends Comparable> implements PailStructure<T> {
public T deserialize(byte[] record) {
T ret = createThriftObject();
try {
getDeserializer().deserialize((TBase)ret, record);
} catch (TException e) {
throw new RuntimeException(e);
}
return ret;
}
public byte[] serialize(T obj) {
try {
return getSerializer().serialize((TBase)obj);
} catch (TException e) {
throw new RuntimeException(e);
}
}
protected abstract T createThriftObject();
private transient TSerializer ser;
private TSerializer getSerializer() {
if(ser==null) ser = new TSerializer();
return ser;
}
private transient TDeserializer des;
private TDeserializer getDeserializer() {
if(des==null) des = new TDeserializer();
return des;
}
public boolean isValidTarget(String... dirs) {
return true;
}
public List<String> getTarget(T object) {
return Collections.EMPTY_LIST;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment