Created
April 10, 2019 02:16
-
-
Save zhaoawd/316e3a5d3d3eefde99879393292fae6f to your computer and use it in GitHub Desktop.
[SealedPut] 针对 put 封装 #flink #hbase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.collect.Maps; | |
import org.apache.hadoop.hbase.Cell; | |
import org.apache.hadoop.hbase.CellUtil; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.SortedMap; | |
/** | |
* 针对Put的一个封装,可以方便的进行序列化防止在某些框架下出现奇怪的错误 | |
* | |
* @author yuanyifan | |
*/ | |
public class SealedPut { | |
private static final Logger LOGGER = LoggerFactory.getLogger(SealedPut.class); | |
public byte[] getRowKey() { | |
return rowKey; | |
} | |
/** | |
* Make `POJO` Happy | |
* | |
* @param rowKey | |
*/ | |
@Deprecated | |
public void setRowKey(byte[] rowKey) { | |
this.rowKey = rowKey; | |
} | |
/** | |
* Make `POJO` Happy | |
* | |
* @param resultData | |
*/ | |
@Deprecated | |
public void setResultData(SortedMap<String, SortedMap<String, byte[]>> resultData) { | |
this.resultData = resultData; | |
} | |
private byte[] rowKey; | |
public SortedMap<String, SortedMap<String, byte[]>> getResultData() { | |
return resultData; | |
} | |
/** | |
* 类型说明: | |
* <p> | |
* Map[ColumnFamily(HexString),Map[Qualifier(HexString),Value(Bytes)]] | |
*/ | |
private SortedMap<String, SortedMap<String, byte[]>> resultData = Maps.newTreeMap(); | |
/** | |
* 创建一个SealedPut | |
* | |
* @param row 行键信息 | |
*/ | |
public SealedPut(byte[] row) { | |
rowKey = row; | |
} | |
/** | |
* 直接通过PUT创建 | |
* | |
* @param hbasePut HBase的PUT | |
*/ | |
public SealedPut(Put hbasePut) { | |
this(hbasePut.getRow()); | |
for (Map.Entry<byte[], List<Cell>> cfData : hbasePut.getFamilyCellMap().entrySet()) { | |
final byte[] columnFamily = cfData.getKey(); | |
for (Cell cell : cfData.getValue()) { | |
put(columnFamily, CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); | |
} | |
} | |
} | |
/** | |
* 将本Put和其他Put加在一起 | |
* | |
* @param v | |
* @return | |
*/ | |
public SealedPut appendSealedPut(SealedPut v) { | |
return merge(this, v); | |
} | |
/** | |
* @return | |
*/ | |
public static SealedPut merge(SealedPut... vs) { | |
if (vs.length <= 1) { | |
throw new RuntimeException("No data need to merge."); | |
} | |
// Check is all the rowkey same | |
final SealedPut v0 = vs[0]; | |
for (SealedPut vi : vs) { | |
if (!Bytes.equals(vi.rowKey, v0.rowKey)) { | |
throw new RuntimeException( | |
String.format( | |
"Can't merge Puts caused by row key not equal (v0=%s, vi=%s).", | |
Bytes.toHex(v0.rowKey), | |
Bytes.toHex(vi.rowKey) | |
) | |
); | |
} | |
} | |
// Merge all put into one | |
final SealedPut newValue = new SealedPut(v0.rowKey); | |
for (SealedPut vi : vs) { | |
for (Map.Entry<String, SortedMap<String, byte[]>> value : vi.resultData.entrySet()) { | |
final byte[] cf = Bytes.fromHex(value.getKey()); | |
for (Map.Entry<String, byte[]> field : value.getValue().entrySet()) { | |
final byte[] qualifier = Bytes.fromHex(field.getKey()); | |
newValue.put(cf, qualifier, field.getValue()); | |
} | |
} | |
} | |
return newValue; | |
} | |
/** | |
* 增加一列 | |
* <p> | |
* 注意增加一列的时候必需按照行键顺序PUT | |
* | |
* @param family 列族信息 | |
* @param qualifier 列标识位 | |
* @param value 数据 | |
*/ | |
public void put(byte[] family, byte[] qualifier, byte[] value) { | |
final String familyHex = Bytes.toHex(family); | |
final String qualifierHex = Bytes.toHex(qualifier); | |
if (!resultData.containsKey(familyHex)) { | |
resultData.put(familyHex, Maps.<String, byte[]>newTreeMap()); | |
} | |
resultData.get(familyHex).put(qualifierHex, value); | |
LOGGER.trace(String.format( | |
"HBase row generated %s: %s -> %s -> %s", | |
Bytes.toHex(rowKey), | |
familyHex, | |
Bytes.toHex(qualifier), | |
Bytes.toHex(value) | |
)); | |
} | |
/** | |
* 转换为HBase可以纳得的Put | |
* | |
* @return | |
*/ | |
public Put toPut() { | |
return convertToPut(this); | |
} | |
/** | |
* 创建函数 | |
* | |
* @param legacyPut | |
* @return | |
*/ | |
public static SealedPut createSealedPut(Put legacyPut) { | |
return new SealedPut(legacyPut); | |
} | |
/** | |
* 将SealedPut转换成HBase使用的Put | |
* | |
* @param sealedPut | |
* @return | |
*/ | |
public static Put convertToPut(SealedPut sealedPut) { | |
Put put = new Put(sealedPut.rowKey); | |
for (Map.Entry<String, SortedMap<String, byte[]>> familyValue : sealedPut.resultData.entrySet()) { | |
for (Map.Entry<String, byte[]> columnValue : familyValue.getValue().entrySet()) { | |
put.addColumn(Bytes.fromHex(familyValue.getKey()), Bytes.fromHex(columnValue.getKey()), columnValue.getValue()); | |
} | |
} | |
return put; | |
} | |
@Override | |
public String toString() { | |
return getStringInHex(); | |
} | |
public String getStringInHex() { | |
final StringBuilder sb = new StringBuilder(String.format( | |
"RowKey:%s\n", Bytes.toHex(getRowKey()) | |
)); | |
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) { | |
sb.append(" FamilyColumn:").append(entry.getKey()).append("\n"); | |
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) { | |
sb.append(" Column: ").append( | |
qv.getKey() | |
).append(" -> ").append( | |
Bytes.toHex(qv.getValue()) | |
).append("\n"); | |
} | |
} | |
return sb.toString(); | |
} | |
/** | |
* For debugging use only | |
* | |
* @return | |
*/ | |
@Deprecated | |
public String getStringInUTF8() { | |
final StringBuilder sb = new StringBuilder(String.format( | |
"RowKey:%s\n", Bytes.toHex(getRowKey()) | |
)); | |
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) { | |
final String columnFamily = Bytes.toString(Bytes.fromHex(entry.getKey())); | |
sb.append(" FamilyColumn: ").append(columnFamily).append("\n"); | |
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) { | |
final String qualifier = Bytes.toString(Bytes.fromHex(qv.getKey())); | |
sb.append(" Column: ").append( | |
qualifier | |
).append(" -> ").append( | |
Bytes.toHex(qv.getValue()) | |
).append("\n"); | |
} | |
} | |
return sb.toString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment