Skip to content

Instantly share code, notes, and snippets.

@mushkevych
Last active December 10, 2015 23:09
Show Gist options
  • Save mushkevych/4507707 to your computer and use it in GitHub Desktop.
Save mushkevych/4507707 to your computer and use it in GitHub Desktop.
SingleColumnValueFilter example
public ResultScanner getProductScanner(HTableInterface hTable,
String manufacturer) throws IOException {
ProductPrimaryKey pkProduct = new ProductPrimaryKey();
FilterList flMaster = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (manufacturer != null && !manufacturer.trim().isEmpty()) {
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Constants.FAMILY_STAT),
Bytes.toBytes(Constants.MANUFACTURER),
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(manufacturer)));
flMaster.addFilter(filter);
}
Scan scan = new Scan();
scan.setFilter(flMaster);
return hTable.getScanner(scan);
}
public class Grouping {
@HRowKey(components = {
@HFieldComponent(name = Constants.TIMEPERIOD, length = Bytes.SIZEOF_INT, type = Integer.class),
@HFieldComponent(name = Constants.CATEGORY, length = Constants.LENGTH_CATEGORY_NAME, type = String.class)
})
public byte[] key;
/**
* format of the storage:
* {product_id : {
* price_highest: int
* price_lowest: int
* manufacturer: String
* }}
*/
@HMapFamily(family = Constants.FAMILY_PRODUCT, keyType = String.class, valueType = Map.class)
@HNestedMap(keyType = String.class, valueType = byte[].class)
public Map<String, Map<String, byte[]>> product = new HashMap<String, Map<String, byte[]>>();
public Grouping() {
}
protected String getStringEntry(String prodId, String key) {
Map<String, byte[]> entry = product.get(prodId);
if (entry == null || !entry.containsKey(key)) {
return null;
}
return Bytes.toString(entry.get(key));
}
protected Integer getIntegerEntry(String prodId, String key) {
Map<String, byte[]> entry = product.get(prodId);
if (entry == null || !entry.containsKey(key)) {
return null;
}
return Bytes.toInt(entry.get(key));
}
protected void setEntry(String prodId, String key, byte[] value) {
Map<String, byte[]> entry = product.get(prodId);
if (entry == null) {
entry = new HashMap<String, byte[]>();
}
entry.put(key, value);
product.put(prodId, entry);
}
public Integer getPriceHighest(String prodId) {
return getIntegerEntry(prodId, Constants.PRICE_HIGHEST);
}
public void setPriceHighest(String prodId, int price) {
setEntry(prodId, Constants.PRICE_HIGHEST, Bytes.toBytes(price));
}
public Integer getPriceLowest(String prodId) {
return getIntegerEntry(prodId, Constants.PRICE_LOWEST);
}
public void setPriceLowest(String prodId, int price) {
setEntry(prodId, Constants.PRICE_LOWEST, Bytes.toBytes(price));
}
public void getManufacturer(String prodId) {
return getEntry(prodId, Constants.MANUFACTURER);
}
public void setManufacturer(String prodId, String manufacturer) {
setEntry(prodId, Constants.MANUFACTURER, Bytes.toBytes(manufacturer));
}
}
<TableSchema name="grouping">
<ColumnSchema name="product" BLOCKCACHE="true" COMPRESSION="snappy" VERSIONS="1" IN_MEMORY="true" TTL="604800"/>
</TableSchema>
// TcPrimaryKey stands for Timeperiod+Category primary key
private TcPrimaryKey pkTc = new TcPrimaryKey();
private EntityService<Product> esProduct = new EntityService<Product>(Product.class);
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Product product = esProduct.parseResult(value);
ImmutableBytesWritable convertedKey = pkTc.generateKey(timePeriod, product.category);
context.write(convertedKey, value);
}
public class Product {
@HRowKey (components = {
@HFieldComponent(name = Constants.ID, length = Constants.LENGTH_STRING_DEFAULT, type = String.class)
})
public byte[] key;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.CATEGORY)
public String category;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.PRICE_LOWEST)
public int priceLowest;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.PRICE_HIGHEST)
public int priceHighest;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.MANUFACTURER)
public String manufacturer;
public Product() {
}
}
<TableSchema name="product">
<ColumnSchema name="stat" BLOCKCACHE="true" COMPRESSION="snappy" VERSIONS="1" IN_MEMORY="true"/>
</TableSchema>
private EntityService<Product> esProduct = new EntityService<Product>(Product.class);
private EntityService<Grouping> esGrouping = new EntityService<Grouping>(Grouping.class);
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) throws IOException, InterruptedException {
Grouping targetDocument = new Grouping();
targetDocument.key = key.get();
for (Result singleResult : values) {
Product sourceDocument = esProduct.parseResult(singleResult);
targetDocument.category = sourceDocument.category;
String prodId = Bytes.toString(key.get());
targetDocument.setPriceHighest(prodId, sourceDocument.priceHighest);
targetDocument.setPriceLowest(prodId, sourceDocument.priceLowest);
targetDocument.setManufacturer(prodId, sourceDocument.manufacturer);
}
try {
Put put = esGrouping.insert(targetDocument);
put.setWriteToWAL(false);
context.write(key, put);
} catch (OutOfMemoryError e) {
// ...
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment