Create a gist now

Instantly share code, notes, and snippets.

A Clojure-driven, seq-based Accumulo Combiner
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* An Accumolo Combiner
*
* http://accumulo.apache.org/1.5/accumulo_user_manual.html#_combiners
*
* that delegates the reduce to user-specified Clojure code.
*
* Derived in part from
*
* https://github.com/charlessimpson/clojure-accumulo-iterators/blob/master/src/main/java/clojure_accumulo/iterators/ClojureCombiner.java
*
* The primary difference is that this code delegates the iteration
* to Clojure code.
*
* The user-specified Clojure code should return a function of two
* arguments: a Key and an Iterator<Value>, and the function should
* return a Java byte array.
*
* See 'EXAMPLE_*` in the code below.
*
* Building:
*
* Add
*
* <dependency>
* <groupId>org.clojure</groupId>
* <artifactId>clojure</artifactId>
* <version>1.4.0</version>
* </dependency>
*
* to 'examples/simple/pom.xml'.
*
*/
package org.apache.accumulo.examples.simple.combiner;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import clojure.lang.IFn;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyValue;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Combiner;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import clojure.lang.Compiler;
import clojure.lang.RT;
import clojure.lang.Var;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
public final class ClojureSeqCombiner extends Combiner {
protected static final boolean useExample = true;
protected static final String F_OPTION = "f";
protected IFn f;
// ToDo: Also provide Clojure-level parameter object.
// Example: Concatenate values with a comma separator.
// LOOP-based iteration.
protected static String EXAMPLE_1 =
"(fn [k vals] " +
" (loop [vs (iterator-seq vals) " +
" acc (transient [])] " +
" (if (empty? vs) " +
" (.getBytes (reduce str (interpose \",\" (persistent! acc)))) " +
" (recur (rest vs) " +
" (conj! acc (new String (.get (first vs)))))))) ";
// Example: Concatenate values with a comma separator.
// MAP-based iteration.
protected static String EXAMPLE_2 =
"(fn [k vals] " +
" (.getBytes (reduce str " +
" (interpose \",\" " +
" (map #(new String (.get %)) " +
" (iterator-seq vals)))))) ";
protected static String EXAMPLE_3 =
// ToDo: Allow options to be passed to the Clojure function.
"(fn [k vals] \n" +
" (let [pair-sep \",\" pairs-sep \";\" limit 16 other \"*\"] \n" +
" (loop [vs (iterator-seq vals) \n" +
" acc {}] ;; ToDo: Use transients. \n" +
" (if (empty? vs) \n" +
" (.getBytes (reduce str \n" +
" (interpose pairs-sep \n" +
" (map (fn [[x n]] \n" +
" (str x pair-sep n)) \n" +
" acc)))) \n" +
" (recur (rest vs) \n" +
" (let [v (new String (.get (first vs)))] \n" +
" (if (.contains v pair-sep) \n" +
" (into {} \n" +
" (let [merged (merge-with + \n" +
" acc \n" +
" (into {} \n" +
" (map (fn [xn] \n" +
" (let [[x n] (.split xn pair-sep 2)] \n" +
" [x (Long/parseLong n)])) \n" +
" (.split (new String v) pairs-sep)))) \n" +
" sorted (sort-by (fn [[k c]] [(- c) k])\n" +
" (dissoc merged other)) \n" +
" top (take limit sorted) \n" +
" bottom (drop limit sorted)] \n" +
" (conj top [other \n" +
" (+ (merged other 0) \n" +
" (reduce + (map second bottom)))]))) \n" +
" (if (or (acc v) \n" +
" (< (count acc) limit)) \n" +
" (assoc acc v (inc (acc v 0))) \n" +
" (assoc acc other (inc (acc other 0))))))))))) \n";
// Which example to use by default.
protected static String EXAMPLE = EXAMPLE_3;
@Override
public Value reduce(Key key, Iterator<Value> valueIterator) {
return new Value((byte[]) f.invoke(key, valueIterator));
}
public static Object eval (String clj) {
Object ret;
try {
// Set the class loader to use Accumulo's so we get Hadoop,
// Accumulo, and whatever is in lib/ext.
Thread.currentThread().setContextClassLoader(AccumuloClassLoader.getClassLoader());
Var.pushThreadBindings(RT.map(RT.USE_CONTEXT_CLASSLOADER, RT.T));
// Actually compile the given code.
ret = Compiler.load(new StringReader(clj));
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
Var.popThreadBindings();
}
return ret;
}
@Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
if (options == null && !useExample) {
throw new IllegalArgumentException(F_OPTION + " must be set for ClojureSeqCombiner");
}
String fString = options.get(F_OPTION);
if (fString == null) {
if (useExample) {
fString = EXAMPLE;
} else {
throw new IllegalArgumentException(F_OPTION + " must be set for ClojureSeqCombiner");
}
}
final Object obj = eval(fString);
if (!(obj instanceof IFn)) {
throw new IllegalArgumentException(F_OPTION + " must compile to something that implements IFn");
}
f = (IFn) obj;
}
@Override
public IteratorOptions describeOptions() {
final IteratorOptions io = super.describeOptions();
io.setName("clojurSeqCombiner");
io.addNamedOption(F_OPTION, "String containing combiner reduce function");
io.setDescription("ClojureSeqCombiner allows a Clojure function to be passed in and invoked as the Combiner's reduce method");
return io;
}
}
// Local Variables:
// c-basic-offset: 2
// indent-tabs-mode: nil
// End:
(defn examine
([hist-string]
(let [pairs
(map #(let [[x n] (vec (.split % ","))]
[x (Long/parseLong n)])
(.split hist-string ";"))]
(println :entries (count pairs))
(println :total (reduce + (map second pairs)))
(doseq [pair (sort-by second pairs)]
(println pair)))))
foo> setiter -t foo -p 10 -scan -minc -majc -n clj -class org.apache.accumulo.examples.simple.combiner.ClojureSeqCombiner
foo> insert b c x a
foo> insert b c x b
foo> insert b c x b
foo> insert b c x c
foo> insert b c x c
foo> insert b c x c
foo> insert b c x c
foo> insert b c x d
foo> scan
b c:x [] a,1;b,2;c,4;d,1
(fn [k vals]
(let [pair-sep "," pairs-sep ";" limit 16 other "*"]
(loop [vs (iterator-seq vals)
acc {}] ;; ToDo: Use transients.
(if (empty? vs)
(.getBytes (reduce str
(interpose pairs-sep
(map (fn [[x n]]
(str x pair-sep n))
acc))))
(recur (rest vs)
(let [v (new String (.get (first vs)))]
(if (.contains v pair-sep)
(into {}
(let [merged (merge-with +
acc
(into {}
(map (fn [xn]
(let [[x n] (.split xn pair-sep 2)]
[x (Long/parseLong n)]))
(.split (new String v) pairs-sep))))
sorted (sort-by (fn [[k c]] [(- c) k]
(dissoc merged other))
top (take limit sorted)
bottom (drop limit sorted)]
(conj top [other
(+ (merged other 0)
(reduce + (map second bottom)))])))
(if (or (acc v)
(< (count acc) limit))
(assoc acc v (inc (acc v 0)))
(assoc acc other (inc (acc other 0))))))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment