Created
June 4, 2011 20:02
-
-
Save russelldb/1008293 to your computer and use it in GitHub Desktop.
Erlang content type map reduce from pb client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java b/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java | |
index e6994a8..78e73c6 100644 | |
--- a/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java | |
+++ b/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java | |
@@ -13,7 +13,9 @@ | |
*/ | |
package com.basho.riak.pbc.mapreduce; | |
+import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
+import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.LinkedHashMap; | |
@@ -32,6 +34,14 @@ import com.basho.riak.pbc.MapReduceResponseSource; | |
import com.basho.riak.pbc.RequestMeta; | |
import com.basho.riak.pbc.RiakClient; | |
import com.basho.riak.pbc.RiakObject; | |
+import com.ericsson.otp.erlang.OtpErlangAtom; | |
+import com.ericsson.otp.erlang.OtpErlangBinary; | |
+import com.ericsson.otp.erlang.OtpErlangBoolean; | |
+import com.ericsson.otp.erlang.OtpErlangList; | |
+import com.ericsson.otp.erlang.OtpErlangLong; | |
+import com.ericsson.otp.erlang.OtpErlangObject; | |
+import com.ericsson.otp.erlang.OtpErlangTuple; | |
+import com.ericsson.otp.erlang.OtpOutputStream; | |
import com.google.protobuf.ByteString; | |
/** | |
@@ -326,6 +336,133 @@ public class MapReduceBuilder { | |
return submit(new RequestMeta().contentType("application/json")); | |
} | |
+ public MapReduceResponseSource submitETF() throws IOException, JSONException { | |
+ IRequestMeta rm = new RequestMeta().contentType("application/x-erlang-binary"); | |
+ byte[] etf = toETFBytes(); | |
+ ByteString request = ByteString.copyFrom(etf); | |
+ return riak.mapReduce(request, rm); | |
+ } | |
+ | |
+ /** | |
+ * @return | |
+ * @throws JSONException | |
+ * (why? cos getting at the JSON is the only way to get at the | |
+ * function contents) | |
+ */ | |
+ private byte[] toETFBytes() throws JSONException { | |
+ // create inputs, query and timeout tuples | |
+ // add them to a list, that is the MapRed job, | |
+ // write it to an outputstream to encode it as bytes | |
+ OtpErlangTuple inputs = buildInputs(); | |
+ OtpErlangTuple query = buildQuery(); | |
+ // Set a timeout, -1 does not play so well with Riak | |
+ OtpErlangTuple timeout = new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("timeout"), | |
+ new OtpErlangLong(getTimeout()==-1?60000:getTimeout()) }); | |
+ OtpErlangList mapred = new OtpErlangList(new OtpErlangObject[] { inputs, query, timeout }); | |
+ OtpOutputStream oos = new OtpOutputStream(mapred); | |
+ | |
+ byte[] etf = oos.toByteArray(); | |
+ | |
+ // add the version header byte since OtpOutStream omits it | |
+ byte[] etfWithVersionHeader = new byte[etf.length + 1]; | |
+ etfWithVersionHeader[0] = (byte) 131; | |
+ System.arraycopy(etf, 0, etfWithVersionHeader, 1, etf.length); | |
+ | |
+ return etfWithVersionHeader; | |
+ } | |
+ | |
+ /** | |
+ * @return | |
+ * @throws JSONException propgate | |
+ */ | |
+ private OtpErlangTuple buildQuery() throws JSONException { | |
+ OtpErlangTuple[] ePhases = new OtpErlangTuple[phases.size()]; | |
+ int cnt = 0; | |
+ for(MapReducePhase phase : phases) { | |
+ ePhases[cnt] = renderPhase(phase); | |
+ cnt++; | |
+ } | |
+ | |
+ return new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("query"), new OtpErlangList(ePhases) }); | |
+ } | |
+ | |
+ /** | |
+ * @param phase | |
+ * @return | |
+ * @throws JSONException | |
+ * propgate | |
+ */ | |
+ private OtpErlangTuple renderPhase(MapReducePhase phase) throws JSONException { | |
+ OtpErlangAtom type = new OtpErlangAtom(phase.type.name().toLowerCase()); // yeah | |
+ // yeah | |
+ // yeah | |
+ // ladder | |
+ OtpErlangTuple fun = renderFun(phase.function); | |
+ OtpErlangAtom arg = new OtpErlangAtom("none"); // erm...you may want to | |
+ // add an arg, really | |
+ OtpErlangBoolean keep = new OtpErlangBoolean(phase.keep); | |
+ | |
+ OtpErlangObject[] elems = new OtpErlangObject[] { type, fun, arg, keep }; | |
+ | |
+ return new OtpErlangTuple(elems); | |
+ } | |
+ | |
+ /** | |
+ * @param function | |
+ * @return | |
+ * @throws JSONException | |
+ * propgate | |
+ */ | |
+ private OtpErlangTuple renderFun(MapReduceFunction function) throws JSONException { | |
+ JSONObject json = function.toJson(); | |
+ if (function instanceof ErlangFunction) { | |
+ OtpErlangAtom funType = new OtpErlangAtom("modfun"); | |
+ OtpErlangAtom mod = new OtpErlangAtom(json.getString("module")); | |
+ OtpErlangAtom fun = new OtpErlangAtom(json.getString("function")); | |
+ | |
+ return new OtpErlangTuple(new OtpErlangObject[] { funType, mod, fun }); | |
+ } else { | |
+ if (json.has("source")) { | |
+ OtpErlangAtom funType = new OtpErlangAtom("jsanon"); | |
+ OtpErlangBinary source = new OtpErlangBinary(json.getString("source").getBytes()); | |
+ | |
+ return new OtpErlangTuple(new OtpErlangObject[] { funType, source }); | |
+ } else { | |
+ OtpErlangAtom funType = new OtpErlangAtom("jsfun"); | |
+ OtpErlangBinary source = new OtpErlangBinary(json.getString("name").getBytes()); | |
+ | |
+ return new OtpErlangTuple(new OtpErlangObject[] { funType, source }); | |
+ } | |
+ | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * @return | |
+ */ | |
+ private OtpErlangTuple buildInputs() { | |
+ if (bucket != null) { | |
+ return new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("inputs"), | |
+ new OtpErlangBinary(bucket.getBytes()) }); | |
+ } else { | |
+ List<OtpErlangTuple> tmpInputs = new ArrayList<OtpErlangTuple>(); | |
+ for (String bucket : objects.keySet()) { | |
+ OtpErlangBinary bucketBin = new OtpErlangBinary(bucket.getBytes()); | |
+ | |
+ Set<String> keys = objects.get(bucket); | |
+ | |
+ for (String key : keys) { | |
+ OtpErlangTuple bk = new OtpErlangTuple(new OtpErlangObject[] { bucketBin, | |
+ new OtpErlangBinary(key.getBytes()) }); | |
+ tmpInputs.add(bk); | |
+ } | |
+ } | |
+ | |
+ OtpErlangObject[] inputs = tmpInputs.toArray(new OtpErlangObject[tmpInputs.size()]); | |
+ return new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("inputs"), new OtpErlangList(inputs) }); | |
+ } | |
+ } | |
+ | |
/** | |
* Builds the JSON representation of a map/reduce job | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment