Skip to content

Instantly share code, notes, and snippets.

@russelldb
Created June 4, 2011 20:02
Show Gist options
  • Save russelldb/1008293 to your computer and use it in GitHub Desktop.
Save russelldb/1008293 to your computer and use it in GitHub Desktop.
Erlang content type map reduce from pb client
diff --git a/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java b/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java
index e6994a8..78e73c6 100644
--- a/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java
+++ b/src/main/java/com/basho/riak/pbc/mapreduce/MapReduceBuilder.java
@@ -13,7 +13,9 @@
*/
package com.basho.riak.pbc.mapreduce;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -32,6 +34,14 @@ import com.basho.riak.pbc.MapReduceResponseSource;
import com.basho.riak.pbc.RequestMeta;
import com.basho.riak.pbc.RiakClient;
import com.basho.riak.pbc.RiakObject;
+import com.ericsson.otp.erlang.OtpErlangAtom;
+import com.ericsson.otp.erlang.OtpErlangBinary;
+import com.ericsson.otp.erlang.OtpErlangBoolean;
+import com.ericsson.otp.erlang.OtpErlangList;
+import com.ericsson.otp.erlang.OtpErlangLong;
+import com.ericsson.otp.erlang.OtpErlangObject;
+import com.ericsson.otp.erlang.OtpErlangTuple;
+import com.ericsson.otp.erlang.OtpOutputStream;
import com.google.protobuf.ByteString;
/**
@@ -326,6 +336,133 @@ public class MapReduceBuilder {
return submit(new RequestMeta().contentType("application/json"));
}
+ public MapReduceResponseSource submitETF() throws IOException, JSONException {
+ IRequestMeta rm = new RequestMeta().contentType("application/x-erlang-binary");
+ byte[] etf = toETFBytes();
+ ByteString request = ByteString.copyFrom(etf);
+ return riak.mapReduce(request, rm);
+ }
+
+ /**
+ * @return
+ * @throws JSONException
+ * (why? cos getting at the JSON is the only way to get at the
+ * function contents)
+ */
+ private byte[] toETFBytes() throws JSONException {
+ // create inputs, query and timeout tuples
+ // add them to a list, that is the MapRed job,
+ // write it to an outputstream to encode it as bytes
+ OtpErlangTuple inputs = buildInputs();
+ OtpErlangTuple query = buildQuery();
+ // Set a timeout, -1 does not play so well with Riak
+ OtpErlangTuple timeout = new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("timeout"),
+ new OtpErlangLong(getTimeout()==-1?60000:getTimeout()) });
+ OtpErlangList mapred = new OtpErlangList(new OtpErlangObject[] { inputs, query, timeout });
+ OtpOutputStream oos = new OtpOutputStream(mapred);
+
+ byte[] etf = oos.toByteArray();
+
+ // add the version header byte since OtpOutStream omits it
+ byte[] etfWithVersionHeader = new byte[etf.length + 1];
+ etfWithVersionHeader[0] = (byte) 131;
+ System.arraycopy(etf, 0, etfWithVersionHeader, 1, etf.length);
+
+ return etfWithVersionHeader;
+ }
+
+ /**
+ * @return
+ * @throws JSONException propgate
+ */
+ private OtpErlangTuple buildQuery() throws JSONException {
+ OtpErlangTuple[] ePhases = new OtpErlangTuple[phases.size()];
+ int cnt = 0;
+ for(MapReducePhase phase : phases) {
+ ePhases[cnt] = renderPhase(phase);
+ cnt++;
+ }
+
+ return new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("query"), new OtpErlangList(ePhases) });
+ }
+
+ /**
+ * @param phase
+ * @return
+ * @throws JSONException
+ * propgate
+ */
+ private OtpErlangTuple renderPhase(MapReducePhase phase) throws JSONException {
+ OtpErlangAtom type = new OtpErlangAtom(phase.type.name().toLowerCase()); // yeah
+ // yeah
+ // yeah
+ // ladder
+ OtpErlangTuple fun = renderFun(phase.function);
+ OtpErlangAtom arg = new OtpErlangAtom("none"); // erm...you may want to
+ // add an arg, really
+ OtpErlangBoolean keep = new OtpErlangBoolean(phase.keep);
+
+ OtpErlangObject[] elems = new OtpErlangObject[] { type, fun, arg, keep };
+
+ return new OtpErlangTuple(elems);
+ }
+
+ /**
+ * @param function
+ * @return
+ * @throws JSONException
+ * propgate
+ */
+ private OtpErlangTuple renderFun(MapReduceFunction function) throws JSONException {
+ JSONObject json = function.toJson();
+ if (function instanceof ErlangFunction) {
+ OtpErlangAtom funType = new OtpErlangAtom("modfun");
+ OtpErlangAtom mod = new OtpErlangAtom(json.getString("module"));
+ OtpErlangAtom fun = new OtpErlangAtom(json.getString("function"));
+
+ return new OtpErlangTuple(new OtpErlangObject[] { funType, mod, fun });
+ } else {
+ if (json.has("source")) {
+ OtpErlangAtom funType = new OtpErlangAtom("jsanon");
+ OtpErlangBinary source = new OtpErlangBinary(json.getString("source").getBytes());
+
+ return new OtpErlangTuple(new OtpErlangObject[] { funType, source });
+ } else {
+ OtpErlangAtom funType = new OtpErlangAtom("jsfun");
+ OtpErlangBinary source = new OtpErlangBinary(json.getString("name").getBytes());
+
+ return new OtpErlangTuple(new OtpErlangObject[] { funType, source });
+ }
+
+ }
+ }
+
+ /**
+ * @return
+ */
+ private OtpErlangTuple buildInputs() {
+ if (bucket != null) {
+ return new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("inputs"),
+ new OtpErlangBinary(bucket.getBytes()) });
+ } else {
+ List<OtpErlangTuple> tmpInputs = new ArrayList<OtpErlangTuple>();
+ for (String bucket : objects.keySet()) {
+ OtpErlangBinary bucketBin = new OtpErlangBinary(bucket.getBytes());
+
+ Set<String> keys = objects.get(bucket);
+
+ for (String key : keys) {
+ OtpErlangTuple bk = new OtpErlangTuple(new OtpErlangObject[] { bucketBin,
+ new OtpErlangBinary(key.getBytes()) });
+ tmpInputs.add(bk);
+ }
+ }
+
+ OtpErlangObject[] inputs = tmpInputs.toArray(new OtpErlangObject[tmpInputs.size()]);
+ return new OtpErlangTuple(new OtpErlangObject[] { new OtpErlangAtom("inputs"), new OtpErlangList(inputs) });
+ }
+ }
+
/**
* Builds the JSON representation of a map/reduce job
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment