|
/* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
*/ |
|
|
|
/* |
|
* An Accumolo Combiner |
|
* |
|
* http://accumulo.apache.org/1.5/accumulo_user_manual.html#_combiners |
|
* |
|
* that maintains a histogram of numeric values. |
|
* |
|
* Histogram representation: |
|
* |
|
* low,b_0,...,b_n,high;min,max,bins |
|
* |
|
* where b_i is the count of values in bin i. n = bins. 'low' and |
|
* 'high' are counts of values below the min and value above the max |
|
* respectively. |
|
* |
|
* Example: |
|
* |
|
* 0,1,2,20,30,1;0,100,4 |
|
* |
|
* means: |
|
* |
|
* 0 values in (-Infinity,0) |
|
* 1 value in [0,25) |
|
* 2 values in [25,50) |
|
* 20 values in [50,75) |
|
* 30 values in [50,100] |
|
* 1 values in (100,+Infinity) |
|
* |
|
* Min, max, and bins are parameters. |
|
* |
|
*/ |
|
|
|
/* Example: |
|
|
|
createtable foo |
|
setiter -t foo -p 10 -scan -minc -majc -n hist -class org.apache.accumulo.examples.simple.combiner.HistCombiner |
|
|
|
c |
|
0 |
|
100 |
|
10 |
|
insert a c x 11 |
|
insert a c x 33 |
|
insert a c x 66 |
|
insert a c x 66 |
|
insert a c x 66 |
|
insert a c x 66 |
|
insert a c x 1000 |
|
scan |
|
# a c:x [] 0,0,1,0,1,0,0,4,0,0,0,1;0.0,100.0,10 |
|
deletetable foo |
|
*/ |
|
|
|
|
|
package org.apache.accumulo.examples.simple.combiner; |
|
|
|
import java.io.IOException; |
|
import java.util.Iterator; |
|
import java.util.Map; |
|
|
|
import org.apache.accumulo.core.client.IteratorSetting; |
|
import org.apache.accumulo.core.data.Key; |
|
import org.apache.accumulo.core.data.Value; |
|
import org.apache.accumulo.core.iterators.Combiner; |
|
import org.apache.accumulo.core.iterators.IteratorEnvironment; |
|
import org.apache.accumulo.core.iterators.SortedKeyValueIterator; |
|
|
|
public class HistCombiner extends Combiner { |
|
|
|
public static final String MIN_OPTION = "min"; |
|
public static final String MAX_OPTION = "max"; |
|
public static final String BINS_OPTION = "bins"; |
|
|
|
protected double rangeMin = 0.0; |
|
protected double rangeMax = 1.0; |
|
protected int bins = 10; |
|
|
|
protected double transform (double x) { |
|
return x; |
|
} |
|
|
|
@Override |
|
public Value reduce(Key key, Iterator<Value> iter) { |
|
|
|
final long[] counts = new long[bins + 2]; |
|
final double width = (rangeMax - rangeMin) / bins; |
|
|
|
while (iter.hasNext()) { |
|
final String hist[] = iter.next().toString().split(";"); |
|
|
|
if (hist.length == 1) { |
|
final double x = transform(Double.parseDouble(hist[0])); |
|
if (x < rangeMin) { |
|
counts[0]++; |
|
} else if (rangeMax < x) { |
|
counts[bins + 1]++; |
|
} else { |
|
int bin = (int) ((x - rangeMin)/width); |
|
if (bin == bins) bin--; // Closed interval in this case. |
|
counts[bin + 1]++; |
|
} |
|
|
|
} else { |
|
final String params[] = hist[1].split(","); |
|
// ToDo: Try to be a little more tolerant. |
|
if (rangeMin != Double.parseDouble(params[0])) { |
|
throw new IllegalArgumentException("Different min: " + params[0]); |
|
} |
|
if (rangeMax != Double.parseDouble(params[1])) { |
|
throw new IllegalArgumentException("Different max: " + params[1]); |
|
} |
|
if (bins != Integer.parseInt(params[2])) { |
|
throw new IllegalArgumentException("Different bins: " + params[2]); |
|
} |
|
|
|
final String c[] = hist[0].split(","); |
|
if (c.length != bins + 2) { |
|
throw new IllegalArgumentException("Wrong number of bins: " + c.length); |
|
} |
|
for (int i = 0; i < bins + 2; i++) { |
|
counts[i] += Long.parseLong(c[i+1]); |
|
} |
|
} |
|
} |
|
|
|
final StringBuffer buf = new StringBuffer(); // ToDo: Estimate length. |
|
for (int i = 0; i < bins + 2; i++) { |
|
if (0 < i) buf.append(","); |
|
buf.append(counts[i]); |
|
} |
|
buf.append(";"); |
|
|
|
buf.append(rangeMin); buf.append(","); |
|
buf.append(rangeMax); buf.append(","); |
|
buf.append(bins); |
|
|
|
return new Value(buf.toString().getBytes()); |
|
} |
|
|
|
@Override |
|
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { |
|
super.init(source, options, env); |
|
|
|
if (options.containsKey(MIN_OPTION)) |
|
rangeMin = Double.parseDouble(options.get(MIN_OPTION)); |
|
|
|
if (options.containsKey(MAX_OPTION)) |
|
rangeMax = Double.parseDouble(options.get(MAX_OPTION)); |
|
|
|
if (options.containsKey(BINS_OPTION)) |
|
bins = Integer.parseInt(options.get(BINS_OPTION)); |
|
} |
|
|
|
@Override |
|
public IteratorOptions describeOptions() { |
|
IteratorOptions io = super.describeOptions(); |
|
io.setName("histCombiner"); |
|
io.setDescription("Combiner that maintains a histogram of numeric values"); |
|
io.addNamedOption(MIN_OPTION, "expected minimum of the numbers"); |
|
io.addNamedOption(MAX_OPTION, "expected maximum of the numbers"); |
|
io.addNamedOption(BINS_OPTION, "number of bins"); |
|
return io; |
|
} |
|
|
|
@Override |
|
public boolean validateOptions(Map<String,String> options) { |
|
if (!super.validateOptions(options)) |
|
return false; |
|
|
|
if (options.containsKey(MIN_OPTION) && !options.get(MIN_OPTION).matches("\\d+(\\.\\d+)?")) |
|
throw new IllegalArgumentException("invalid option " + MIN_OPTION + ":" + options.get(MIN_OPTION)); |
|
|
|
if (options.containsKey(MAX_OPTION) && !options.get(MAX_OPTION).matches("\\d+(\\.\\d+)?")) |
|
throw new IllegalArgumentException("invalid option " + MAX_OPTION + ":" + options.get(MAX_OPTION)); |
|
|
|
if (options.containsKey(BINS_OPTION) && !options.get(BINS_OPTION).matches("\\d+")) |
|
throw new IllegalArgumentException("invalid option " + BINS_OPTION + ":" + options.get(BINS_OPTION)); |
|
|
|
return true; |
|
} |
|
|
|
public static void setMin(IteratorSetting iterConfig, double min) { |
|
iterConfig.addOption(MIN_OPTION, min + ""); |
|
} |
|
|
|
public static void setMax(IteratorSetting iterConfig, double max) { |
|
iterConfig.addOption(MAX_OPTION, max + ""); |
|
} |
|
|
|
public static void setBins(IteratorSetting iterConfig, int bins) { |
|
iterConfig.addOption(BINS_OPTION, bins + ""); |
|
} |
|
} |
|
|
|
// Local Variables: |
|
// c-basic-offset: 2 |
|
// indent-tabs-mode: nil |
|
// End: |