Skip to content

Instantly share code, notes, and snippets.

@zhaoawd
Created April 10, 2019 02:16
Show Gist options
  • Save zhaoawd/316e3a5d3d3eefde99879393292fae6f to your computer and use it in GitHub Desktop.
Save zhaoawd/316e3a5d3d3eefde99879393292fae6f to your computer and use it in GitHub Desktop.
[SealedPut] 针对 put 封装 #flink #hbase
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
/**
* 针对Put的一个封装,可以方便的进行序列化防止在某些框架下出现奇怪的错误
*
* @author yuanyifan
*/
public class SealedPut {
private static final Logger LOGGER = LoggerFactory.getLogger(SealedPut.class);
public byte[] getRowKey() {
return rowKey;
}
/**
* Make `POJO` Happy
*
* @param rowKey
*/
@Deprecated
public void setRowKey(byte[] rowKey) {
this.rowKey = rowKey;
}
/**
* Make `POJO` Happy
*
* @param resultData
*/
@Deprecated
public void setResultData(SortedMap<String, SortedMap<String, byte[]>> resultData) {
this.resultData = resultData;
}
private byte[] rowKey;
public SortedMap<String, SortedMap<String, byte[]>> getResultData() {
return resultData;
}
/**
* 类型说明:
* <p>
* Map[ColumnFamily(HexString),Map[Qualifier(HexString),Value(Bytes)]]
*/
private SortedMap<String, SortedMap<String, byte[]>> resultData = Maps.newTreeMap();
/**
* 创建一个SealedPut
*
* @param row 行键信息
*/
public SealedPut(byte[] row) {
rowKey = row;
}
/**
* 直接通过PUT创建
*
* @param hbasePut HBase的PUT
*/
public SealedPut(Put hbasePut) {
this(hbasePut.getRow());
for (Map.Entry<byte[], List<Cell>> cfData : hbasePut.getFamilyCellMap().entrySet()) {
final byte[] columnFamily = cfData.getKey();
for (Cell cell : cfData.getValue()) {
put(columnFamily, CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
}
}
}
/**
* 将本Put和其他Put加在一起
*
* @param v
* @return
*/
public SealedPut appendSealedPut(SealedPut v) {
return merge(this, v);
}
/**
* @return
*/
public static SealedPut merge(SealedPut... vs) {
if (vs.length <= 1) {
throw new RuntimeException("No data need to merge.");
}
// Check is all the rowkey same
final SealedPut v0 = vs[0];
for (SealedPut vi : vs) {
if (!Bytes.equals(vi.rowKey, v0.rowKey)) {
throw new RuntimeException(
String.format(
"Can't merge Puts caused by row key not equal (v0=%s, vi=%s).",
Bytes.toHex(v0.rowKey),
Bytes.toHex(vi.rowKey)
)
);
}
}
// Merge all put into one
final SealedPut newValue = new SealedPut(v0.rowKey);
for (SealedPut vi : vs) {
for (Map.Entry<String, SortedMap<String, byte[]>> value : vi.resultData.entrySet()) {
final byte[] cf = Bytes.fromHex(value.getKey());
for (Map.Entry<String, byte[]> field : value.getValue().entrySet()) {
final byte[] qualifier = Bytes.fromHex(field.getKey());
newValue.put(cf, qualifier, field.getValue());
}
}
}
return newValue;
}
/**
* 增加一列
* <p>
* 注意增加一列的时候必需按照行键顺序PUT
*
* @param family 列族信息
* @param qualifier 列标识位
* @param value 数据
*/
public void put(byte[] family, byte[] qualifier, byte[] value) {
final String familyHex = Bytes.toHex(family);
final String qualifierHex = Bytes.toHex(qualifier);
if (!resultData.containsKey(familyHex)) {
resultData.put(familyHex, Maps.<String, byte[]>newTreeMap());
}
resultData.get(familyHex).put(qualifierHex, value);
LOGGER.trace(String.format(
"HBase row generated %s: %s -> %s -> %s",
Bytes.toHex(rowKey),
familyHex,
Bytes.toHex(qualifier),
Bytes.toHex(value)
));
}
/**
* 转换为HBase可以纳得的Put
*
* @return
*/
public Put toPut() {
return convertToPut(this);
}
/**
* 创建函数
*
* @param legacyPut
* @return
*/
public static SealedPut createSealedPut(Put legacyPut) {
return new SealedPut(legacyPut);
}
/**
* 将SealedPut转换成HBase使用的Put
*
* @param sealedPut
* @return
*/
public static Put convertToPut(SealedPut sealedPut) {
Put put = new Put(sealedPut.rowKey);
for (Map.Entry<String, SortedMap<String, byte[]>> familyValue : sealedPut.resultData.entrySet()) {
for (Map.Entry<String, byte[]> columnValue : familyValue.getValue().entrySet()) {
put.addColumn(Bytes.fromHex(familyValue.getKey()), Bytes.fromHex(columnValue.getKey()), columnValue.getValue());
}
}
return put;
}
@Override
public String toString() {
return getStringInHex();
}
public String getStringInHex() {
final StringBuilder sb = new StringBuilder(String.format(
"RowKey:%s\n", Bytes.toHex(getRowKey())
));
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) {
sb.append(" FamilyColumn:").append(entry.getKey()).append("\n");
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) {
sb.append(" Column: ").append(
qv.getKey()
).append(" -> ").append(
Bytes.toHex(qv.getValue())
).append("\n");
}
}
return sb.toString();
}
/**
* For debugging use only
*
* @return
*/
@Deprecated
public String getStringInUTF8() {
final StringBuilder sb = new StringBuilder(String.format(
"RowKey:%s\n", Bytes.toHex(getRowKey())
));
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) {
final String columnFamily = Bytes.toString(Bytes.fromHex(entry.getKey()));
sb.append(" FamilyColumn: ").append(columnFamily).append("\n");
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) {
final String qualifier = Bytes.toString(Bytes.fromHex(qv.getKey()));
sb.append(" Column: ").append(
qualifier
).append(" -> ").append(
Bytes.toHex(qv.getValue())
).append("\n");
}
}
return sb.toString();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment