-
-
Save shpikat/5ff71f845d8d6d981336 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This file is licensed to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance with the | |
* License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
*/ | |
import com.google.common.base.Function; | |
import com.google.common.collect.Collections2; | |
import com.google.protobuf.Descriptors; | |
import com.google.protobuf.Message; | |
import com.google.protobuf.Service; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.Cell; | |
import org.apache.hadoop.hbase.CellUtil; | |
import org.apache.hadoop.hbase.HTableDescriptor; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.hbase.TableName; | |
import org.apache.hadoop.hbase.client.Append; | |
import org.apache.hadoop.hbase.client.Delete; | |
import org.apache.hadoop.hbase.client.Durability; | |
import org.apache.hadoop.hbase.client.Get; | |
import org.apache.hadoop.hbase.client.HTableInterface; | |
import org.apache.hadoop.hbase.client.Increment; | |
import org.apache.hadoop.hbase.client.Mutation; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.ResultScanner; | |
import org.apache.hadoop.hbase.client.Row; | |
import org.apache.hadoop.hbase.client.RowMutations; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.client.coprocessor.Batch; | |
import org.apache.hadoop.hbase.filter.CompareFilter; | |
import org.apache.hadoop.hbase.filter.Filter; | |
import org.apache.hadoop.hbase.filter.Filter.ReturnCode; | |
import org.apache.hadoop.hbase.io.TimeRange; | |
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.NavigableMap; | |
import java.util.NavigableSet; | |
import java.util.NoSuchElementException; | |
import java.util.TreeMap; | |
import java.util.TreeSet; | |
/** | |
* Mock implementation of HTableInterface. Holds any supplied data in a | |
* multi-dimensional NavigableMap which acts as a in-memory database. Useful for | |
* testing classes that operate on data using an HTableInterface. | |
* <p/> | |
* Instances should be get using {@code MockHTable.create()}. So while a | |
* DAO with a saving operation like | |
* | |
* <pre> | |
* public class MyDAO { | |
* private HTableInterface table; | |
* | |
* public MyDAO(HTableInterface table) { | |
* this.table = table; | |
* } | |
* | |
* public void saveData(byte[] id, byte[] data) throws IOException{ | |
* Put put = new Put(id) | |
* put.add(family, qualifier, data); | |
* table.put(put); | |
* } | |
* } | |
* </pre> | |
* <p/> | |
* is used in production like | |
* | |
* <pre> | |
* MyDAO(new HTable(conf, tableName)).saveData(id, data); | |
* </pre> | |
* <p/> | |
* can be tested like | |
* | |
* <pre> | |
* @Test | |
* public void testSave() { | |
* MockHTable table = MockHTable.create(); | |
* MyDAO(table).saveData(id, data); | |
* Get get = new Get(id); | |
* Result result = table.get(get); | |
* assertArrayEquals(data, result.getValue(family, qualifier)); | |
* } | |
* </pre> | |
* <p/> | |
* MockHTable instances can also be initialized with pre-loaded data using one | |
* of the String[][] or Map<String, Map<String, String>> data formats. While | |
* String[][] parameter lets directly loading data from source code, Map can be | |
* generated from a YAML document, using a parser. | |
* | |
* <pre> | |
* // String[][] | |
* MockHTable table = MockHTable.with(new String[][] { | |
* { "<rowid>", "<column>", "<value>" }, | |
* { "id", "family:qualifier1", "data1" }, | |
* { "id", "family:qualifier2", "data2" } | |
* }); | |
* // YAML | |
* String database = "id:\n family:qualifier1: data1\n family:qualifier2: data2\n"; | |
* MockHTable table = MockHTable.with((Map<String, Map<String, String>) new Yaml().load(database)); | |
* </pre> | |
* <p/> | |
* If value is not supposed to be a String, but an int, double or anything, | |
* {@code MockHTable.toEString()} can be used to turn it into a String. | |
* <p/> | |
* In order to simplify assertions for tests that should put anything into | |
* database, MockHTable.read() works with two parameters (id and column) and | |
* returns anything written to that row/column. So, previous test can be reduced to | |
* | |
* <pre> | |
* @Test | |
* public void testSave() { | |
* MockHTable table = MockHTable.create(); | |
* MyDAO(table).saveData(id, data); | |
* assertArrayEquals(data, table.read(id, "family:qualifier")); | |
* } | |
* </pre> | |
* <p/> | |
* | |
* @author erdem | |
*/ | |
public class MockHTable implements HTableInterface { | |
/** | |
* This is all the data for a MockHTable instance | |
*/ | |
private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>>(Bytes.BYTES_COMPARATOR); | |
private final String tableName; | |
protected MockHTable(String tableName) { | |
this.tableName = tableName; | |
} | |
/** | |
* Helper method to convert some data into a list of KeyValue's | |
* | |
* @param row row value of the KeyValue's | |
* @param rowdata data to decode | |
* @param maxVersions number of versions to return | |
* @return List of KeyValue's | |
*/ | |
private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { | |
return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); | |
} | |
/** | |
* Helper method to convert some data into a list of KeyValue's with timestamp | |
* constraint | |
* | |
* @param row row value of the KeyValue's | |
* @param rowdata data to decode | |
* @param timestampStart start of the timestamp constraint | |
* @param timestampEnd end of the timestamp constraint | |
* @param maxVersions number of versions to return | |
* @return List of KeyValue's | |
*/ | |
private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { | |
List<KeyValue> ret = new ArrayList<>(); | |
for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> familyEntry : rowdata.entrySet()) { | |
byte[] family = familyEntry.getKey(); | |
for (Entry<byte[], NavigableMap<Long, byte[]>> column : familyEntry.getValue().entrySet()) { | |
byte[] qualifier = column.getKey(); | |
NavigableMap<Long, byte[]> values = column.getValue(); | |
int versionsAdded = 0; | |
for (Entry<Long, byte[]> tsToVal : values.descendingMap().entrySet()) { | |
if (versionsAdded++ == maxVersions) { | |
break; | |
} | |
Long timestamp = tsToVal.getKey(); | |
if (timestamp < timestampStart) { | |
continue; | |
} | |
if (timestamp > timestampEnd) { | |
continue; | |
} | |
byte[] value = tsToVal.getValue(); | |
ret.add(new KeyValue(row, family, qualifier, timestamp, value)); | |
} | |
} | |
} | |
return ret; | |
} | |
/** | |
* Default way of constructing a MockHTable | |
* | |
* @return a new MockHTable | |
*/ | |
public static MockHTable create(String tableName) { | |
return new MockHTable(tableName); | |
} | |
/** | |
* Create a MockHTable with some pre-loaded data. Parameter should be a map of | |
* column-to-data mappings of rows. It can be created with a YAML like | |
* <p/> | |
* <pre> | |
* rowid: | |
* family1:qualifier1: value1 | |
* family2:qualifier2: value2 | |
* </pre> | |
* | |
* @param tableName table name | |
* @param dump pre-loaded data | |
* @return a new MockHTable loaded with given data | |
*/ | |
public static MockHTable with(String tableName, Map<String, Map<String, String>> dump) { | |
MockHTable ret = new MockHTable(tableName); | |
for (Entry<String, Map<String, String>> row : dump.entrySet()) { | |
for (Entry<String, String> column : row.getValue().entrySet()) { | |
put(ret, row.getKey(), column.getKey(), column.getValue()); | |
} | |
} | |
return ret; | |
} | |
/** | |
* Helper method of pre-loaders, adds parameters to data. | |
* | |
* @param ret data to load into | |
* @param row rowid | |
* @param column family:qualifier encoded value | |
* @param val value | |
*/ | |
private static void put(MockHTable ret, String row, String column, | |
String val) { | |
String[] fq = split(column); | |
byte[] family = Bytes.toBytesBinary(fq[0]); | |
byte[] qualifier = Bytes.toBytesBinary(fq[1]); | |
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> families = ret.forceFind(ret.data, Bytes.toBytesBinary(row), new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR)); | |
NavigableMap<byte[], NavigableMap<Long, byte[]>> qualifiers = ret.forceFind(families, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR)); | |
NavigableMap<Long, byte[]> values = ret.forceFind(qualifiers, qualifier, new TreeMap<Long, byte[]>()); | |
values.put(System.currentTimeMillis(), Bytes.toBytesBinary(val)); | |
} | |
/** | |
* Create a MockHTable with some pre-loaded data. Parameter should be an array | |
* of string arrays which define every column value individually. | |
* <p/> | |
* <pre> | |
* new String[][] { | |
* { "<rowid>", "<column>", "<value>" }, | |
* { "id", "family:qualifier1", "data1" }, | |
* { "id", "family:qualifier2", "data2" } | |
* }); | |
* </pre> | |
* | |
* @param tableName table name | |
* @param dump | |
* @return | |
*/ | |
public static MockHTable with(String tableName, String[][] dump) { | |
MockHTable ret = new MockHTable(tableName); | |
for (String[] row : dump) { | |
put(ret, row[0], row[1], row[2]); | |
} | |
return ret; | |
} | |
/** | |
* Column identification helper | |
* | |
* @param column column name in the format family:qualifier | |
* @return {@code{"family", "qualifier"}} | |
*/ | |
private static String[] split(String column) { | |
return new String[]{ | |
column.substring(0, column.indexOf(':')), | |
column.substring(column.indexOf(':') + 1)}; | |
} | |
/** | |
* Clients should not rely on table names so this returns null. | |
* | |
* @return null | |
*/ | |
@Override | |
public byte[] getTableName() { | |
return Bytes.toBytes(tableName); | |
} | |
@Override | |
public TableName getName() { | |
return TableName.valueOf(tableName); | |
} | |
/** | |
* No configuration needed to work so this returns null. | |
* | |
* @return null | |
*/ | |
@Override | |
public Configuration getConfiguration() { | |
return null; | |
} | |
/** | |
* No table descriptor needed so this returns null. | |
* | |
* @return null | |
*/ | |
@Override | |
public HTableDescriptor getTableDescriptor() { | |
return null; | |
} | |
@Override | |
public boolean exists(Get get) throws IOException { | |
return !get(get).isEmpty(); | |
} | |
@Override | |
public Boolean[] exists(List<Get> gets) { | |
return Collections2.transform(gets, new Function<Get, Boolean>() { | |
@Override | |
public Boolean apply(Get get) { | |
try { | |
return exists(get); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
}).toArray(new Boolean[gets.size()]); | |
} | |
@Override | |
public synchronized Result get(Get get) throws IOException { | |
byte[] row = get.getRow(); | |
if (!data.containsKey(row)) { | |
return new Result(); | |
} | |
List<KeyValue> kvs; | |
if (get.hasFamilies()) { | |
kvs = new ArrayList<>(); | |
for (Entry<byte[], NavigableSet<byte[]>> familyEntry : get.getFamilyMap().entrySet()) { | |
byte[] family = familyEntry.getKey(); | |
if (data.get(row).get(family) == null) { | |
continue; | |
} | |
NavigableSet<byte[]> qualifiers = familyEntry.getValue(); | |
if (qualifiers == null || qualifiers.isEmpty()) { | |
qualifiers = new TreeSet<>(data.get(row).get(family).navigableKeySet()); | |
} | |
for (byte[] qualifier : qualifiers) { | |
if (qualifier == null) { | |
qualifier = new byte[0]; | |
} | |
if (!data.get(row).containsKey(family) || | |
!data.get(row).get(family).containsKey(qualifier) || | |
data.get(row).get(family).get(qualifier).isEmpty()) { | |
continue; | |
} | |
Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry(); | |
kvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue())); | |
} | |
} | |
} else { | |
kvs = toKeyValue(row, data.get(row), get.getMaxVersions()); | |
} | |
TimeRange timeRange = get.getTimeRange(); | |
if (timeRange != null) { | |
for (Iterator<KeyValue> iterator = kvs.iterator(); iterator.hasNext(); ) { | |
KeyValue kv = iterator.next(); | |
if (timeRange.compare(kv.getTimestamp()) != 0) { | |
iterator.remove(); | |
} | |
} | |
} | |
Filter filter = get.getFilter(); | |
List<KeyValue> nkvs = filter == null ? kvs : filter(kvs, filter); | |
Collections.sort(nkvs, KeyValue.COMPARATOR); | |
return new Result(nkvs); | |
} | |
@Override | |
public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { | |
throw new UnsupportedOperationException("Update MockHTable class if needed"); | |
} | |
@Override | |
public synchronized ResultScanner getScanner(Scan scan) throws IOException { | |
List<Result> ret = new ArrayList<>(); | |
byte[] st = scan.getStartRow(); | |
byte[] sp = scan.getStopRow(); | |
Filter filter = scan.getFilter(); | |
for (Entry<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> rowData : data.entrySet()) { | |
byte[] row = rowData.getKey(); | |
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> families = rowData.getValue(); | |
// if row is equal to startRow emit it. When startRow (inclusive) and | |
// stopRow (exclusive) is the same, it should not be excluded which would | |
// happen w/o this control. | |
if (st != null && st.length > 0 && | |
Bytes.compareTo(st, row) != 0) { | |
// if row is before startRow do not emit, pass to next row | |
if (Bytes.compareTo(st, row) > 0) { | |
continue; | |
} | |
// if row is equal to stopRow or after it do not emit, stop iteration | |
if (sp != null && sp.length > 0 && | |
Bytes.compareTo(sp, row) <= 0) { | |
break; | |
} | |
} | |
List<KeyValue> kvs; | |
if (scan.hasFamilies()) { | |
kvs = new ArrayList<>(); | |
for (Entry<byte[], NavigableSet<byte[]>> familyEntry : scan.getFamilyMap().entrySet()) { | |
byte[] family = familyEntry.getKey(); | |
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = families.get(family); | |
if (familyData == null) { | |
continue; | |
} | |
NavigableSet<byte[]> qualifiers = familyEntry.getValue(); | |
if (qualifiers == null || qualifiers.isEmpty()) { | |
qualifiers = familyData.navigableKeySet(); | |
} | |
for (byte[] qualifier : qualifiers) { | |
NavigableMap<Long, byte[]> values = familyData.get(qualifier); | |
if (values == null) { | |
continue; | |
} | |
int nVersionsAdded = 0; | |
for (Long timestamp : values.descendingKeySet()) { | |
if (timestamp < scan.getTimeRange().getMin()) { | |
continue; | |
} | |
if (timestamp > scan.getTimeRange().getMax()) { | |
continue; | |
} | |
byte[] value = values.get(timestamp); | |
kvs.add(new KeyValue(row, family, qualifier, timestamp, value)); | |
if (++nVersionsAdded == scan.getMaxVersions()) { | |
break; | |
} | |
} | |
} | |
} | |
} else { | |
kvs = toKeyValue(row, families, scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions()); | |
} | |
if (filter != null) { | |
kvs = filter(kvs, filter); | |
} | |
if (!kvs.isEmpty()) { | |
Collections.sort(kvs, KeyValue.COMPARATOR); | |
ret.add(new Result(kvs)); | |
} | |
// Check for early out optimization | |
if (filter != null && filter.filterAllRemaining()) { | |
break; | |
} | |
} | |
return new ResultScanner() { | |
private Iterator<Result> iterator = ret.iterator(); | |
@Override | |
public Iterator<Result> iterator() { | |
return iterator; | |
} | |
@Override | |
public Result[] next(int nbRows) throws IOException { | |
ArrayList<Result> resultSets = new ArrayList<>(nbRows); | |
for (int i = 0; i < nbRows; i++) { | |
Result next = next(); | |
if (next != null) { | |
resultSets.add(next); | |
} else { | |
break; | |
} | |
} | |
return resultSets.toArray(new Result[resultSets.size()]); | |
} | |
@Override | |
public Result next() throws IOException { | |
try { | |
return iterator().next(); | |
} catch (NoSuchElementException ignored) { | |
return null; | |
} | |
} | |
@Override | |
public void close() { | |
} | |
}; | |
} | |
/** | |
* Follows the logical flow through the filter methods for a single row. | |
* | |
* @param filter HBase filter. | |
* @param kvs List of a row's KeyValues | |
* @return List of KeyValues that were not filtered. | |
*/ | |
private List<KeyValue> filter(List<KeyValue> kvs, Filter filter) throws IOException { | |
filter.reset(); | |
List<KeyValue> nkvs = new ArrayList<>(kvs.size()); | |
/* Filter flow for a single row. Adapted from "HBase: The Definitive Guide" (p. 163) by Lars George, 2011. See Figure 4-2 on p. 163. */ | |
boolean filteredOnRowKey = false; | |
Cell cellFamilyToSkip = null; | |
for (KeyValue kv : kvs) { | |
if (cellFamilyToSkip != null) { | |
if (CellUtil.matchingFamily(cellFamilyToSkip, kv)) { | |
continue; | |
} else { | |
cellFamilyToSkip = null; | |
} | |
} | |
if (filter.filterAllRemaining()) { | |
break; | |
} | |
if (filter.filterRowKey(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())) { | |
filteredOnRowKey = true; | |
break; | |
} | |
ReturnCode filterResult = filter.filterKeyValue(kv); | |
if (filterResult == ReturnCode.INCLUDE || filterResult == ReturnCode.INCLUDE_AND_NEXT_COL) { | |
nkvs.add(kv); | |
} else if (filterResult == ReturnCode.NEXT_ROW) { | |
// There might be other families, so we must rewind up to next one and apply the filter again | |
cellFamilyToSkip = kv; | |
} | |
// ignoring next key hint which is a optimization to reduce file system IO | |
} | |
if (filter.hasFilterRow() && !filteredOnRowKey) { | |
filter.filterRow(nkvs); | |
} | |
if (filter.filterRow() || filteredOnRowKey) { | |
nkvs.clear(); | |
} | |
return nkvs; | |
} | |
@Override | |
public ResultScanner getScanner(byte[] family) throws IOException { | |
Scan scan = new Scan(); | |
scan.addFamily(family); | |
return getScanner(scan); | |
} | |
@Override | |
public ResultScanner getScanner(byte[] family, byte[] qualifier) | |
throws IOException { | |
Scan scan = new Scan(); | |
scan.addColumn(family, qualifier); | |
return getScanner(scan); | |
} | |
@Override | |
public synchronized void put(Put put) throws IOException { | |
byte[] row = put.getRow(); | |
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR)); | |
for (Entry<byte[], List<KeyValue>> familyEntry : put.getFamilyMap().entrySet()) { | |
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, familyEntry.getKey(), new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR)); | |
for (KeyValue kv : familyEntry.getValue()) { | |
kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis())); | |
byte[] qualifier = kv.getQualifier(); | |
NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>()); | |
qualifierData.put(kv.getTimestamp(), kv.getValue()); | |
} | |
} | |
} | |
/** | |
* Helper method to find a key in a map. If key is not found, newObject is | |
* added to map and returned | |
* | |
* @param map map to extract value from | |
* @param key key to look for | |
* @param newObject set key to this if not found | |
* @return found value or newObject if not found | |
*/ | |
private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) { | |
V data = map.get(key); | |
if (data == null) { | |
data = newObject; | |
map.put(key, data); | |
} | |
return data; | |
} | |
@Override | |
public void put(List<Put> puts) throws IOException { | |
for (Put put : puts) { | |
put(put); | |
} | |
} | |
/** | |
* Checks if the value with given details exists in database, or is | |
* non-existent in the case of value being null | |
* | |
* @param row row | |
* @param family family | |
* @param qualifier qualifier | |
* @param value value | |
* @return true if value is not null and exists in db, or value is null and | |
* not exists in db, false otherwise | |
*/ | |
private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) { | |
if (value == null || value.length == 0) { | |
return !data.containsKey(row) || | |
!data.get(row).containsKey(family) || | |
!data.get(row).get(family).containsKey(qualifier) || | |
data.get(row).get(family).get(qualifier).lastEntry().getValue() == null || | |
data.get(row).get(family).get(qualifier).lastEntry().getValue().length == 0; | |
} else { | |
return data.containsKey(row) && | |
data.get(row).containsKey(family) && | |
data.get(row).get(family).containsKey(qualifier) && | |
!data.get(row).get(family).get(qualifier).isEmpty() && | |
Arrays.equals(data.get(row).get(family).get(qualifier).lastEntry().getValue(), value); | |
} | |
} | |
@Override | |
public synchronized boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, | |
byte[] value, Put put) throws IOException { | |
if (check(row, family, qualifier, value)) { | |
put(put); | |
return true; | |
} | |
return false; | |
} | |
@Override | |
public synchronized void delete(Delete delete) throws IOException { | |
byte[] row = delete.getRow(); | |
if (data.get(row) == null) { | |
return; | |
} | |
Map<byte[], List<KeyValue>> families = delete.getFamilyMap(); | |
if (families.isEmpty()) { | |
data.remove(row); | |
return; | |
} | |
for (Entry<byte[], List<KeyValue>> familyEntry : families.entrySet()) { | |
byte[] family = familyEntry.getKey(); | |
if (data.get(row).get(family) == null) { | |
continue; | |
} | |
List<KeyValue> kvs = familyEntry.getValue(); | |
if (kvs.isEmpty()) { | |
data.get(row).remove(family); | |
continue; | |
} | |
for (KeyValue kv : kvs) { | |
data.get(row).get(kv.getFamily()).remove(kv.getQualifier()); | |
} | |
if (data.get(row).get(family).isEmpty()) { | |
data.get(row).remove(family); | |
} | |
} | |
if (data.get(row).isEmpty()) { | |
data.remove(row); | |
} | |
} | |
@Override | |
public void delete(List<Delete> deletes) throws IOException { | |
for (Delete delete : deletes) { | |
delete(delete); | |
} | |
} | |
@Override | |
public synchronized boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, | |
byte[] value, Delete delete) throws IOException { | |
if (check(row, family, qualifier, value)) { | |
delete(delete); | |
return true; | |
} | |
return false; | |
} | |
@Override | |
public void mutateRow(RowMutations rm) throws IOException { | |
// currently only support Put and Delete | |
for (Mutation mutation : rm.getMutations()) { | |
if (mutation instanceof Put) { | |
put((Put) mutation); | |
} else if (mutation instanceof Delete) { | |
delete((Delete) mutation); | |
} | |
} | |
} | |
@Override | |
public synchronized boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { | |
if (check(row, family, qualifier, value)) { | |
mutateRow(mutation); | |
return true; | |
} | |
return false; | |
} | |
@Override | |
public Result append(Append append) throws IOException { | |
throw new UnsupportedOperationException("Update MockHTable class if needed"); | |
} | |
@Override | |
public long incrementColumnValue(byte[] row, byte[] family, | |
byte[] qualifier, long amount) throws IOException { | |
return incrementColumnValue(row, family, qualifier, amount, true); | |
} | |
@Override | |
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException { | |
return incrementColumnValue(row, family, qualifier, amount, durability != Durability.SKIP_WAL); | |
} | |
@Override | |
public synchronized long incrementColumnValue(byte[] row, byte[] family, | |
byte[] qualifier, long amount, boolean writeToWAL) throws IOException { | |
if (check(row, family, qualifier, null)) { | |
Put put = new Put(row); | |
put.add(family, qualifier, Bytes.toBytes(amount)); | |
put(put); | |
return amount; | |
} | |
long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount; | |
data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), | |
Bytes.toBytes(newValue)); | |
return newValue; | |
} | |
@Override | |
public boolean isAutoFlush() { | |
return true; | |
} | |
@Override | |
public void setAutoFlush(boolean autoFlush) { | |
} | |
@Override | |
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { | |
} | |
@Override | |
public void setAutoFlushTo(boolean autoFlush) { | |
} | |
@Override | |
public void flushCommits() throws IOException { | |
} | |
@Override | |
public void close() throws IOException { | |
} | |
@Override | |
public CoprocessorRpcChannel coprocessorService(byte[] row) { | |
throw new UnsupportedOperationException("Update MockHTable class if needed"); | |
} | |
@Override | |
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws Throwable { | |
throw new UnsupportedOperationException("Update MockHTable class if needed"); | |
} | |
@Override | |
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws Throwable { | |
throw new UnsupportedOperationException("Update MockHTable class if needed"); | |
} | |
@Override | |
public long getWriteBufferSize() { | |
return 0; | |
} | |
@Override | |
public void setWriteBufferSize(long writeBufferSize) throws IOException { | |
} | |
@Override | |
public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws Throwable { | |
throw new UnsupportedOperationException("Update MockHTable class if needed"); | |
} | |
@Override | |
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws Throwable { | |
throw new UnsupportedOperationException("Update MockHTable class if needed"); | |
} | |
@Override | |
public Object[] batch(List<? extends Row> actions) | |
throws IOException, | |
InterruptedException { | |
Result[] results = new Result[actions.size()]; // same size. | |
for (int i = 0; i < actions.size(); i++) { | |
Row r = actions.get(i); | |
if (r instanceof Delete) { | |
delete((Delete) r); | |
results[i] = new Result(); | |
} else if (r instanceof Put) { | |
put((Put) r); | |
results[i] = new Result(); | |
} else if (r instanceof Get) { | |
results[i] = get((Get) r); | |
} else if (r instanceof Increment) { | |
results[i] = increment((Increment) r); | |
} else if (r instanceof Append) { | |
results[i] = append((Append) r); | |
} | |
} | |
return results; | |
} | |
@Override | |
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { | |
throw new UnsupportedOperationException("Actions: " + actions); | |
} | |
@Override | |
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException { | |
throw new UnsupportedOperationException("Actions: " + actions); | |
} | |
@Override | |
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { | |
// results are not populated | |
batch(actions); | |
} | |
@Override | |
public Result[] get(List<Get> gets) throws IOException { | |
List<Result> results = new ArrayList<>(gets.size()); | |
for (Get g : gets) { | |
results.add(get(g)); | |
} | |
return results.toArray(new Result[results.size()]); | |
} | |
@Override | |
public synchronized Result increment(Increment increment) throws IOException { | |
List<KeyValue> kvs = new ArrayList<>(); | |
for (Entry<byte[], List<Cell>> cells : increment.getFamilyCellMap().entrySet()) { | |
byte[] family = cells.getKey(); | |
for (Cell cell : cells.getValue()) { | |
byte[] qualifier = CellUtil.cloneQualifier(cell); | |
long newValue = incrementColumnValue(increment.getRow(), family, qualifier, Bytes.toLong(CellUtil.cloneValue(cell))); | |
kvs.add(new KeyValue(increment.getRow(), family, qualifier, Bytes.toBytes(newValue))); | |
} | |
} | |
Collections.sort(kvs, KeyValue.COMPARATOR); | |
return new Result(kvs); | |
} | |
/** | |
* Read a value saved in the object. Useful for making assertions in tests. | |
* | |
* @param rowid rowid of the data to read | |
* @param column family:qualifier of the data to read | |
* @return value or null if row or column of the row does not exist | |
*/ | |
public synchronized byte[] read(String rowid, String column) { | |
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> row = data.get(Bytes.toBytesBinary(rowid)); | |
if (row == null) { | |
return null; | |
} | |
String[] fq = split(column); | |
byte[] family = Bytes.toBytesBinary(fq[0]); | |
byte[] qualifier = Bytes.toBytesBinary(fq[1]); | |
if (!row.containsKey(family)) { | |
return null; | |
} | |
if (!row.get(family).containsKey(qualifier)) { | |
return null; | |
} | |
return row.get(family).get(qualifier).lastEntry().getValue(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment