Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package org.apache.hadoop.hbase.client;
/**
* This file is licensed to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeMap;
public class MockTable implements Table {
private final String tableName;
private final List<String> columnFamilies = new ArrayList<>();
private final NavigableMap<byte[],
NavigableMap<byte[],
NavigableMap<byte[],
NavigableMap<Long, byte[]>
>
>
> data = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private static List<Cell> toKeyValue(byte[] row,
NavigableMap<byte[],
NavigableMap<byte[],
NavigableMap<Long, byte[]>
>
> rowData,
int maxVersions) {
return toKeyValue(row, rowData, 0, Long.MAX_VALUE, maxVersions);
}
public MockTable(String tableName) {
this.tableName = tableName;
}
public MockTable(String tableName, String... columnFamilies) {
this.tableName = tableName;
this.columnFamilies.addAll(Arrays.asList(columnFamilies));
}
public void addColumnFamily(String columnFamily) {
this.columnFamilies.add(columnFamily);
}
@Override
public TableName getName() {
return TableName.valueOf(tableName.getBytes());
}
@Override
public Configuration getConfiguration() {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
HTableDescriptor table = new HTableDescriptor(getName());
for (String columnFamily : columnFamilies) {
table.addFamily(new HColumnDescriptor(columnFamily));
}
return table;
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
// currently only support Put and Delete
for (Mutation mutation : rm.getMutations()) {
if (mutation instanceof Put) {
put((Put) mutation);
} else if (mutation instanceof Delete) {
delete((Delete) mutation);
}
}
}
@Override
public Result append(Append append) throws IOException {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
private static List<Cell> toKeyValue(byte[] row,
NavigableMap<byte[],
NavigableMap<byte[],
NavigableMap<Long, byte[]>
>
> rowdata,
long timestampStart, long timestampEnd, int maxVersions) {
List<Cell> ret = new ArrayList<Cell>();
for (byte[] family : rowdata.keySet())
for (byte[] qualifier : rowdata.get(family).keySet()) {
int versionsAdded = 0;
for (Map.Entry<Long, byte[]> tsToVal :
rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
if (versionsAdded++ == maxVersions)
break;
Long timestamp = tsToVal.getKey();
if (timestamp < timestampStart)
continue;
if (timestamp > timestampEnd)
continue;
byte[] value = tsToVal.getValue();
ret.add(new KeyValue(row, family, qualifier, timestamp, value));
}
}
return ret;
}
/**
* {@inheritDoc}
*/
@Override
public boolean exists(Get get) throws IOException {
Result result = get(get);
return result != null && !result.isEmpty();
}
/**
* {@inheritDoc}
*/
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
results = batch(actions);
}
/**
* {@inheritDoc}
*/
@Override
public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
Object[] results = new Object[actions.size()]; // same size.
for (int i = 0; i < actions.size(); i++) {
Row r = actions.get(i);
if (r instanceof Delete) {
delete((Delete) r);
results[i] = new Result();
}
if (r instanceof Put) {
put((Put) r);
results[i] = new Result();
}
if (r instanceof Get) {
Result result = get((Get) r);
results[i] = result;
}
if (r instanceof Increment) {
Result result = increment((Increment) r);
results[i] = result;
}
if (r instanceof Append) {
Result result = append((Append) r);
results[i] = result;
}
}
return results;
}
/**
* {@inheritDoc}
*/
@Override
public Result get(Get get) throws IOException {
if (!data.containsKey(get.getRow()))
return new Result();
byte[] row = get.getRow();
List<Cell> kvs = new ArrayList<Cell>();
if (!get.hasFamilies()) {
kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
} else {
for (byte[] family : get.getFamilyMap().keySet()) {
if (data.get(row).get(family) == null)
continue;
NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
if (qualifiers == null || qualifiers.isEmpty())
qualifiers = data.get(row).get(family).navigableKeySet();
for (byte[] qualifier : qualifiers) {
if (qualifier == null)
qualifier = "".getBytes();
if (!data.get(row).containsKey(family) ||
!data.get(row).get(family).containsKey(qualifier) ||
data.get(row).get(family).get(qualifier).isEmpty())
continue;
Map.Entry<Long, byte[]> timestampAndValue =
data.get(row).get(family).get(qualifier).lastEntry();
kvs.add(new KeyValue(row, family, qualifier,
timestampAndValue.getKey(), timestampAndValue.getValue()));
}
}
}
Filter filter = get.getFilter();
if (filter != null) {
kvs = filter(filter, kvs);
}
return Result.create(kvs);
}
/**
* {@inheritDoc}
*/
@Override
public Result[] get(List<Get> gets) throws IOException {
List<Result> results = new ArrayList<Result>();
for (Get g : gets) {
results.add(get(g));
}
return results.toArray(new Result[results.size()]);
}
/**
* {@inheritDoc}
*/
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
final List<Result> ret = new ArrayList<Result>();
byte[] st = scan.getStartRow();
byte[] sp = scan.getStopRow();
Filter filter = scan.getFilter();
for (byte[] row : data.keySet()) {
// if row is equal to startRow emit it. When startRow (inclusive) and
// stopRow (exclusive) is the same, it should not be excluded which would
// happen w/o this control.
if (st != null && st.length > 0 &&
Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
// if row is before startRow do not emit, pass to next row
if (st != null && st.length > 0 &&
Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
continue;
// if row is equal to stopRow or after it do not emit, stop iteration
if (sp != null && sp.length > 0 &&
Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
break;
}
List<Cell> kvs;
if (!scan.hasFamilies()) {
kvs = toKeyValue(row, data.get(row),
scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
} else {
kvs = new ArrayList<Cell>();
for (byte[] family : scan.getFamilyMap().keySet()) {
if (data.get(row).get(family) == null)
continue;
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
if (qualifiers == null || qualifiers.isEmpty())
qualifiers = data.get(row).get(family).navigableKeySet();
for (byte[] qualifier : qualifiers) {
if (data.get(row).get(family).get(qualifier) == null)
continue;
for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()) {
if (timestamp < scan.getTimeRange().getMin())
continue;
if (timestamp > scan.getTimeRange().getMax())
continue;
byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
if (kvs.size() == scan.getMaxVersions()) {
break;
}
}
}
}
}
if (filter != null) {
kvs = filter(filter, kvs);
// Check for early out optimization
if (filter.filterAllRemaining()) {
break;
}
}
if (!kvs.isEmpty()) {
ret.add(Result.create(kvs));
}
}
return new ResultScanner() {
private final Iterator<Result> iterator = ret.iterator();
public Iterator<Result> iterator() {
return iterator;
}
public Result[] next(int nbRows) throws IOException {
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
for (int i = 0; i < nbRows; i++) {
Result next = next();
if (next != null) {
resultSets.add(next);
} else {
break;
}
}
return resultSets.toArray(new Result[resultSets.size()]);
}
public Result next() throws IOException {
try {
return iterator().next();
} catch (NoSuchElementException e) {
return null;
}
}
public void close() {
}
};
}
/**
* Follows the logical flow through the filter methods for a single row.
*
* @param filter HBase filter.
* @param kvs List of a row's KeyValues
* @return List of KeyValues that were not filtered.
*/
private List<Cell> filter(Filter filter, List<Cell> kvs) throws IOException {
filter.reset();
List<Cell> tmp = new ArrayList<Cell>(kvs.size());
tmp.addAll(kvs);
/*
* Note. Filter flow for a single row. Adapted from
* "HBase: The Definitive Guide" (p. 163) by Lars George, 2011.
* See Figure 4-2 on p. 163.
*/
boolean filteredOnRowKey = false;
List<Cell> nkvs = new ArrayList<Cell>(tmp.size());
for (Cell kv : tmp) {
if (filter.filterRowKey(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())) {
filteredOnRowKey = true;
break;
}
Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
if (filterResult == Filter.ReturnCode.INCLUDE) {
nkvs.add(kv);
} else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
break;
} else if (filterResult == Filter.ReturnCode.NEXT_COL || filterResult == Filter.ReturnCode.SKIP) {
continue;
}
/*
* Ignoring next key hint which is a optimization to reduce file
* system IO
*/
}
if (filter.hasFilterRow() && !filteredOnRowKey) {
filter.filterRow();
}
if (filter.filterRow() || filteredOnRowKey) {
nkvs.clear();
}
tmp = nkvs;
return tmp;
}
/**
* {@inheritDoc}
*/
@Override
public ResultScanner getScanner(byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return getScanner(scan);
}
/**
* {@inheritDoc}
*/
@Override
public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
Scan scan = new Scan();
scan.addColumn(family, qualifier);
return getScanner(scan);
}
private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) {
V data = map.get(key);
if (data == null) {
data = newObject;
map.put(key, data);
}
return data;
}
/**
* {@inheritDoc}
*/
@Override
public void put(Put put) throws IOException {
byte[] row = put.getRow();
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
for (byte[] family : put.getFamilyCellMap().keySet()) {
if (!columnFamilies.contains(new String(family))) {
throw new RuntimeException("Not Exists columnFamily : " + new String(family));
}
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
for (Cell cell : put.getFamilyCellMap().get(family)) {
KeyValue kv = (KeyValue) cell;
kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
byte[] qualifier = kv.getQualifierArray();
NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
qualifierData.put(kv.getTimestamp(), kv.getValueArray());
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void put(List<Put> puts) throws IOException {
for (Put put : puts) {
put(put);
}
}
private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) {
if (value == null || value.length == 0)
return !data.containsKey(row) ||
!data.get(row).containsKey(family) ||
!data.get(row).get(family).containsKey(qualifier);
else
return data.containsKey(row) &&
data.get(row).containsKey(family) &&
data.get(row).get(family).containsKey(qualifier) &&
!data.get(row).get(family).get(qualifier).isEmpty() &&
Arrays.equals(data.get(row).get(family).get(qualifier).lastEntry().getValue(), value);
}
/**
* {@inheritDoc}
*/
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
if (check(row, family, qualifier, value)) {
put(put);
return true;
}
return false;
}
/**
* {@inheritDoc}
*/
@Override
public void delete(Delete delete) throws IOException {
byte[] row = delete.getRow();
if (data.get(row) == null)
return;
if (delete.getFamilyCellMap().size() == 0) {
data.remove(row);
return;
}
for (byte[] family : delete.getFamilyCellMap().keySet()) {
if (data.get(row).get(family) == null)
continue;
if (delete.getFamilyCellMap().get(family).isEmpty()) {
data.get(row).remove(family);
continue;
}
for (Cell kv : delete.getFamilyCellMap().get(family)) {
data.get(row).get(kv.getFamilyArray()).remove(kv.getQualifierArray());
}
if (data.get(row).get(family).isEmpty()) {
data.get(row).remove(family);
}
}
if (data.get(row).isEmpty()) {
data.remove(row);
}
}
/**
* {@inheritDoc}
*/
@Override
public void delete(List<Delete> deletes) throws IOException {
for (Delete delete : deletes) {
delete(delete);
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
if (check(row, family, qualifier, value)) {
delete(delete);
return true;
}
return false;
}
@Override
public Result increment(Increment increment) throws IOException {
List<Cell> kvs = new ArrayList<Cell>();
Map<byte[], NavigableMap<byte[], Long>> famToVal = increment.getFamilyMapOfLongs();
for (Map.Entry<byte[], NavigableMap<byte[], Long>> ef : famToVal.entrySet()) {
byte[] family = ef.getKey();
NavigableMap<byte[], Long> qToVal = ef.getValue();
for (Map.Entry<byte[], Long> eq : qToVal.entrySet()) {
long newValue = incrementColumnValue(increment.getRow(), family, eq.getKey(), eq.getValue());
kvs.add(new KeyValue(increment.getRow(), family, eq.getKey(), Bytes.toBytes(newValue)));
}
}
return Result.create(kvs);
}
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
return incrementColumnValue(row, family, qualifier, amount, Durability.USE_DEFAULT);
}
/**
* {@inheritDoc}
*/
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
if (check(row, family, qualifier, null)) {
Put put = new Put(row);
put.add(family, qualifier, Bytes.toBytes(amount));
put(put);
return amount;
}
long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount;
data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(),
Bytes.toBytes(newValue));
return newValue;
}
/**
* {@inheritDoc}
*/
@Override
public boolean isAutoFlush() {
return true;
}
/**
* {@inheritDoc}
*/
@Override
public void flushCommits() throws IOException {
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException {
}
@Override
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) {
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) {
return null;
}
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public boolean[] existsAll(List<Get> gets) throws IOException {
return new boolean[0];
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public void setAutoFlushTo(boolean autoFlush) {
}
@Override
public long getWriteBufferSize() {
return 0;
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
}
}
@nielsbasjes

This comment has been minimized.

Copy link
Owner Author

nielsbasjes commented Sep 17, 2014

This has updated on 2014-09-17 to work in the master branch of the hbase-client code base (i.e. a lot of API changes). Due to lack of tests I was only able to validate part of the workings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.