-
-
Save sorenmacbeth/1956644 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns forma.hadoop.pail | |
(:use cascalog.api | |
[cascalog.io :only (with-fs-tmp)] | |
[forma.reproject :only (hv->tilestring)]) | |
(:import [forma.schema DataChunk FireTuple FormaValue | |
LocationProperty LocationPropertyValue | |
ModisPixelLocation DataValue] | |
[backtype.cascading.tap PailTap PailTap$PailTapOptions] | |
[backtype.hadoop.pail Pail])) | |
;; ## Pail Data Structures | |
(gen-class :name forma.hadoop.pail.DataChunkPailStructure | |
:extends forma.tap.ThriftPailStructure | |
:prefix "pail-") | |
(defn pail-getType [this] DataChunk) | |
(defn pail-createThriftObject [this] (DataChunk.)) | |
(gen-class :name forma.hadoop.pail.SplitDataChunkPailStructure | |
:extends forma.hadoop.pail.DataChunkPailStructure | |
:prefix "split-") | |
(defn split-getTarget [this ^DataChunk d] | |
(let [location (-> d .getLocationProperty .getProperty .getFieldValue) | |
tilestring (hv->tilestring (.getTileH location) (.getTileV location)) | |
res (format "%s-%s" | |
(.getResolution location) | |
(.getTemporalRes d))] | |
[(.getDataset d) res tilestring])) | |
(defn split-isValidTarget [this dirs] | |
(boolean (#{3 4} (count dirs)))) | |
;; ## Pail Taps | |
(defn- pail-tap | |
[path colls structure] | |
(let [seqs (into-array java.util.List colls) | |
spec (PailTap/makeSpec nil structure) | |
opts (PailTap$PailTapOptions. spec "datachunk" seqs nil)] | |
(PailTap. path opts))) | |
(defn data-chunk-tap [path & colls] | |
(pail-tap path colls (forma.hadoop.pail.DataChunkPailStructure.))) | |
(defn split-chunk-tap [path & colls] | |
(pail-tap path colls (forma.hadoop.pail.SplitDataChunkPailStructure.))) | |
(defn ?pail-* | |
"Executes the supplied query into the pail located at the supplied | |
path, consolidating when finished." | |
[tap pail-path query] | |
(let [pail (Pail. pail-path)] | |
(with-fs-tmp [_ tmp] | |
(?- (tap tmp) query) | |
(.absorb pail (Pail. tmp))))) | |
(defmacro ?pail- | |
"Executes the supplied query into the pail located at the supplied | |
path, consolidating when finished." | |
[[tap path] query] | |
(list `?pail-* tap path query)) | |
(defn to-pail | |
"Executes the supplied `query` into the pail at `pail-path`. This | |
pail must make use of the `SplitDataChunkPailStructure`." | |
[pail-path query] | |
(?pail- (split-chunk-tap pail-path) | |
query)) | |
(defmain consolidate [pail-path] | |
(.consolidate (Pail. pail-path))) | |
(defmain absorb [from-pail to-pail] | |
(.absorb (Pail. to-pail) (Pail. from-pail))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package forma.tap; | |
import backtype.hadoop.pail.PailStructure; | |
import java.util.Collections; | |
import java.util.List; | |
import org.apache.thrift.TBase; | |
import org.apache.thrift.TDeserializer; | |
import org.apache.thrift.TException; | |
import org.apache.thrift.TSerializer; | |
public abstract class ThriftPailStructure<T extends Comparable> implements PailStructure<T> { | |
public T deserialize(byte[] record) { | |
T ret = createThriftObject(); | |
try { | |
getDeserializer().deserialize((TBase)ret, record); | |
} catch (TException e) { | |
throw new RuntimeException(e); | |
} | |
return ret; | |
} | |
public byte[] serialize(T obj) { | |
try { | |
return getSerializer().serialize((TBase)obj); | |
} catch (TException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
protected abstract T createThriftObject(); | |
private transient TSerializer ser; | |
private TSerializer getSerializer() { | |
if(ser==null) ser = new TSerializer(); | |
return ser; | |
} | |
private transient TDeserializer des; | |
private TDeserializer getDeserializer() { | |
if(des==null) des = new TDeserializer(); | |
return des; | |
} | |
public boolean isValidTarget(String... dirs) { | |
return true; | |
} | |
public List<String> getTarget(T object) { | |
return Collections.EMPTY_LIST; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment