Skip to content

Instantly share code, notes, and snippets.

@camathieu
Last active August 29, 2015 14:13
Show Gist options
  • Save camathieu/e88bcd95042e90df069c to your computer and use it in GitHub Desktop.
Save camathieu/e88bcd95042e90df069c to your computer and use it in GitHub Desktop.
FuzzyRowFilter
/*
* Copyright (C) 2014 The Async HBase Authors. All rights reserved.
* This file is part of Async HBase.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the StumbleUpon nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.hbase.async;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.hbase.async.generated.FilterPB;
import org.hbase.async.generated.HBasePB;
import org.jboss.netty.buffer.ChannelBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.TreeSet;
/**
* Filters data based on fuzzy row key. Performs fast-forwards during scanning.
* It takes pairs (row key, fuzzy info) to match row keys. Where fuzzy info is
* a byte array with 0 or 1 as its values:
* <ul>
* <li>
* 0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position
* must match
* </li>
* <li>
* 1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this
* position can be different from the one in provided row key
* </li>
* </ul>
*
*
* Example:
* Let's assume row key format is userId_actionId_year_month. Length of userId is fixed
* and is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively.
*
* Let's assume that we need to fetch all users that performed certain action (encoded as "99")
* in Jan of any year. Then the pair (row key, fuzzy info) would be the following:
* row key = "????_99_????_01" (one can use any value instead of "?")
* fuzzy info = "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00"
*
* I.e. fuzzy info tells the matching mask is "????_99_????_01", where at ? can be any value.
* @since 1.7
*/
public final class FuzzyRowFilter extends ScanFilter {
private static final byte[] NAME =
Bytes.UTF8("org.apache.hadoop.hbase.filter.FuzzyRowFilter");
private final Collection<FuzzyFilterPair> filter_pairs;
public static class FuzzyFilterPair {
private final byte[] row_key;
private final byte[] fuzzy_info;
public FuzzyFilterPair(final byte[] row_key, final byte[] fuzzy_info) {
this.row_key = row_key;
this.fuzzy_info = fuzzy_info;
}
public byte[] getRowKey() {
return row_key;
}
public byte[] getFuzzyInfo() {
return fuzzy_info;
}
}
/**
* Create a fuzzy row filter applying all the filters in the collection
*
* @param timestamps collection of timestamps to keep
*/
public FuzzyRowFilter(final FuzzyFilterPair filter_pair) {
this.filter_pairs = java.util.Collections.singleton(filter_pair);
}
/**
* Create a fuzzy row filter applying all the filters in the collection
*
* @param timestamps collection of timestamps to keep
*/
public FuzzyRowFilter(final Collection<FuzzyFilterPair> filter_pairs) {
this.filter_pairs = filter_pairs;
}
@Override
byte[] name() {
return NAME;
}
@Override
byte[] serialize() {
final FilterPB.FuzzyRowFilter.Builder builder =
FilterPB.FuzzyRowFilter.newBuilder();
for ( FuzzyFilterPair filter_pair : filter_pairs ) {
builder.addFuzzyKeysData(
HBasePB.BytesBytesPair.newBuilder()
.setFirst(ByteString.copyFrom(filter_pair.getRowKey()))
.setSecond(ByteString.copyFrom(filter_pair.getFuzzyInfo())));
System.out.println(Arrays.toString(filter_pair.getRowKey()));
System.out.println(Arrays.toString(filter_pair.getFuzzyInfo()));
}
return builder.build().toByteArray();
}
@Override
void serializeOld(ChannelBuffer buf) {
//buf.writeByte((byte) NAME.length); // 1
//buf.writeBytes(NAME); // 47
//buf.writeInt(filters.size()); // 4
//for (Long timestamp : timestamps) {
// buf.writeLong(timestamp); // timestamps.size() * 8
//}
}
@Override
int predictSerializedSize() {
return 1 + 47 + 4;
}
}
/** Test fuzzy row filters. */
@Test
public void fuzzyRowFilter() throws Exception {
client.setFlushInterval(FAST_FLUSH);
final PutRequest put1 = new PutRequest(table, "1 accept this",
family, "q", "frfv1");
final PutRequest put2 = new PutRequest(table, "2 filter this",
family, "q", "frfv2");
final PutRequest put3 = new PutRequest(table, "3 accept that",
family, "q", "frfv3");
Deferred.group(client.put(put1), client.put(put2),
client.put(put3)).join();
final Scanner scanner = client.newScanner(table);
scanner.setFamily(family);
scanner.setStartKey("1");
scanner.setStopKey("4");
FuzzyRowFilter frf = new FuzzyRowFilter(
new FuzzyRowFilter.FuzzyFilterPair(
new byte[]{'X',' ','a','c','c','e','p','t',' ','t','h','i','s'},
new byte[]{ 1 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 , 1 , 1 , 1 }));
scanner.setFilter(frf);
final ArrayList<ArrayList<KeyValue>> rows = scanner.nextRows().join();
assertSizeIs(2, rows);
ArrayList<KeyValue> kvs = rows.get(0);
assertSizeIs(1, kvs);
assertEq("frfv1", kvs.get(0).value());
kvs = rows.get(1);
assertSizeIs(1, kvs);
assertEq("frfv3", kvs.get(0).value());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment