Skip to content

Instantly share code, notes, and snippets.

@TsingJyujing
Last active July 7, 2020 08:50
Show Gist options
  • Save TsingJyujing/dd6fbce9ab756197275cef8f6507c33e to your computer and use it in GitHub Desktop.
Save TsingJyujing/dd6fbce9ab756197275cef8f6507c33e to your computer and use it in GitHub Desktop.
Flink持久化踩坑笔记代码
import com.google.common.collect.Lists;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
/**
* HBase 数据持久化
*
* @author yuanyifan
*/
public class HBaseRichOutputFormat extends RichOutputFormat<PutCollection> {
private static final Logger LOGGER = LoggerFactory.getLogger(HBaseRichOutputFormat.class);
private Connection conn;
@Override
public void configure(Configuration parameters) {
// 初始化连接
try {
conn = ConnectionFactory.createConnection(
// TODO 这里根据你的需要加载配置
);
LOGGER.info("HBase connection established.");
} catch (Exception ex) {
LOGGER.error("Error while creating HBase connection from configure.", ex);
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// 打开连接,由于不涉及数据分片之类的问题,所以这里不需要做什么操作
LOGGER.info("HBaseRichOutputFormat opened which taskNumber={}, numTasks={}", taskNumber, numTasks);
}
@Override
public void writeRecord(PutCollection data) throws IOException {
// TODO 批量写入数据,略
}
@Override
public void close() throws IOException {
// 结束的时候记得关闭连接(其实永远不会结束)
if (!conn.isClosed()) {
conn.close();
}
LOGGER.info("HBaseRichOutputFormat closed.");
}
}
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* 各种Put的集合,包含表信息
*/
public class PutCollection {
public HashMap<String, Map<String, SealedPut>> getData() {
return data;
}
public void setData(HashMap<String, Map<String, SealedPut>> data) {
this.data = data;
}
private HashMap<String, Map<String, SealedPut>> data = Maps.newHashMap();
public void putAll(String tableName, Collection<SealedPut> puts) {
if (!data.containsKey(tableName)) {
data.put(tableName, Maps.newHashMap());
}
final Map<String, SealedPut> table = data.get(tableName);
for (SealedPut put : puts) {
final String hexRow = Bytes.toHex(put.getRowKey());
if (!table.containsKey(hexRow)) {
table.put(hexRow, put);
} else {
table.put(
hexRow,
table.get(hexRow).appendSealedPut(put)
);
}
}
}
public void put(String tableName, SealedPut put) {
if (!data.containsKey(tableName)) {
data.put(tableName, Maps.newHashMap());
}
final Map<String, SealedPut> table = data.get(tableName);
final String hexRow = Bytes.toHex(put.getRowKey());
if (!table.containsKey(hexRow)) {
table.put(hexRow, put);
} else {
table.put(
hexRow,
table.get(hexRow).appendSealedPut(put)
);
}
}
/**
* 合并多个PutCollection
*
* @param cs
* @return
*/
public static PutCollection merge(PutCollection... cs) {
PutCollection c0 = new PutCollection();
for (PutCollection ci : cs) {
for (Map.Entry<String, Map<String, SealedPut>> entry : ci.data.entrySet()) {
c0.putAll(entry.getKey(), entry.getValue().values());
}
}
return c0;
}
public static PutCollection create(String tableName, Collection<SealedPut> puts) {
PutCollection result = new PutCollection();
result.putAll(tableName, puts);
return result;
}
public static PutCollection create(String tableName, SealedPut put) {
PutCollection result = new PutCollection();
result.put(tableName, put);
return result;
}
}
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
/**
* 针对Put的一个封装,可以方便的进行序列化防止在某些框架下出现奇怪的错误
*
* @author yuanyifan
*/
public class SealedPut {
private static final Logger LOGGER = LoggerFactory.getLogger(SealedPut.class);
public byte[] getRowKey() {
return rowKey;
}
/**
* Make `POJO` Happy
*
* @param rowKey
*/
@Deprecated
public void setRowKey(byte[] rowKey) {
this.rowKey = rowKey;
}
/**
* Make `POJO` Happy
*
* @param resultData
*/
@Deprecated
public void setResultData(SortedMap<String, SortedMap<String, byte[]>> resultData) {
this.resultData = resultData;
}
private byte[] rowKey;
public SortedMap<String, SortedMap<String, byte[]>> getResultData() {
return resultData;
}
/**
* 类型说明:
* <p>
* Map[ColumnFamily(HexString),Map[Qualifier(HexString),Value(Bytes)]]
*/
private SortedMap<String, SortedMap<String, byte[]>> resultData = Maps.newTreeMap();
/**
* 创建一个SealedPut
*
* @param row 行键信息
*/
public SealedPut(byte[] row) {
rowKey = row;
}
/**
* 直接通过PUT创建
*
* @param hbasePut HBase的PUT
*/
public SealedPut(Put hbasePut) {
this(hbasePut.getRow());
for (Map.Entry<byte[], List<Cell>> cfData : hbasePut.getFamilyCellMap().entrySet()) {
final byte[] columnFamily = cfData.getKey();
for (Cell cell : cfData.getValue()) {
put(columnFamily, CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
}
}
}
/**
* 将本Put和其他Put加在一起
*
* @param v
* @return
*/
public SealedPut appendSealedPut(SealedPut v) {
return merge(this, v);
}
/**
* @return
*/
public static SealedPut merge(SealedPut... vs) {
if (vs.length <= 1) {
throw new RuntimeException("No data need to merge.");
}
// Check is all the rowkey same
final SealedPut v0 = vs[0];
for (SealedPut vi : vs) {
if (!Bytes.equals(vi.rowKey, v0.rowKey)) {
throw new RuntimeException(
String.format(
"Can't merge Puts caused by row key not equal (v0=%s, vi=%s).",
Bytes.toHex(v0.rowKey),
Bytes.toHex(vi.rowKey)
)
);
}
}
// Merge all put into one
final SealedPut newValue = new SealedPut(v0.rowKey);
for (SealedPut vi : vs) {
for (Map.Entry<String, SortedMap<String, byte[]>> value : vi.resultData.entrySet()) {
final byte[] cf = Bytes.fromHex(value.getKey());
for (Map.Entry<String, byte[]> field : value.getValue().entrySet()) {
final byte[] qualifier = Bytes.fromHex(field.getKey());
newValue.put(cf, qualifier, field.getValue());
}
}
}
return newValue;
}
/**
* 增加一列
* <p>
* 注意增加一列的时候必需按照行键顺序PUT
*
* @param family 列族信息
* @param qualifier 列标识位
* @param value 数据
*/
public void put(byte[] family, byte[] qualifier, byte[] value) {
final String familyHex = Bytes.toHex(family);
final String qualifierHex = Bytes.toHex(qualifier);
if (!resultData.containsKey(familyHex)) {
resultData.put(familyHex, Maps.<String, byte[]>newTreeMap());
}
resultData.get(familyHex).put(qualifierHex, value);
LOGGER.trace(String.format(
"HBase row generated %s: %s -> %s -> %s",
Bytes.toHex(rowKey),
familyHex,
Bytes.toHex(qualifier),
Bytes.toHex(value)
));
}
/**
* 转换为HBase可以纳得的Put
*
* @return
*/
public Put toPut() {
return convertToPut(this);
}
/**
* 创建函数
*
* @param legacyPut
* @return
*/
public static SealedPut createSealedPut(Put legacyPut) {
return new SealedPut(legacyPut);
}
/**
* 将SealedPut转换成HBase使用的Put
*
* @param sealedPut
* @return
*/
public static Put convertToPut(SealedPut sealedPut) {
Put put = new Put(sealedPut.rowKey);
for (Map.Entry<String, SortedMap<String, byte[]>> familyValue : sealedPut.resultData.entrySet()) {
for (Map.Entry<String, byte[]> columnValue : familyValue.getValue().entrySet()) {
put.addColumn(Bytes.fromHex(familyValue.getKey()), Bytes.fromHex(columnValue.getKey()), columnValue.getValue());
}
}
return put;
}
@Override
public String toString() {
return getStringInHex();
}
public String getStringInHex() {
final StringBuilder sb = new StringBuilder(String.format(
"RowKey:%s\n", Bytes.toHex(getRowKey())
));
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) {
sb.append(" FamilyColumn:").append(entry.getKey()).append("\n");
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) {
sb.append(" Column: ").append(
qv.getKey()
).append(" -> ").append(
Bytes.toHex(qv.getValue())
).append("\n");
}
}
return sb.toString();
}
/**
* For debugging use only
*
* @return
*/
@Deprecated
public String getStringInUTF8() {
final StringBuilder sb = new StringBuilder(String.format(
"RowKey:%s\n", Bytes.toHex(getRowKey())
));
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) {
final String columnFamily = Bytes.toString(Bytes.fromHex(entry.getKey()));
sb.append(" FamilyColumn: ").append(columnFamily).append("\n");
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) {
final String qualifier = Bytes.toString(Bytes.fromHex(qv.getKey()));
sb.append(" Column: ").append(
qualifier
).append(" -> ").append(
Bytes.toHex(qv.getValue())
).append("\n");
}
}
return sb.toString();
}
}
package com.cvnavi.streaming.trigger
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
import org.apache.flink.api.common.typeutils.base.LongSerializer
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.hadoop.hbase.util.Bytes
import org.slf4j.{Logger, LoggerFactory}
/**
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge
*/
class TimeCountTrigger[W <: TimeWindow](
maxCount: Long,
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime
package com.cvnavi.streaming.trigger
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
import org.apache.flink.apackage com.cvnavi.streaming.trigger
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
import org.apache.flink.api.common.typeutils.base.LongSerializer
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.hadoop.hbase.util.Bytes
import org.slf4j.{Logger, LoggerFactory}
/**
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge
*/
class TimeCountTrigger[W <: TimeWindow](
maxCount: Long,
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime
) extends Trigger[Object, W] {
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass)
/**
* 计数状态
*/
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long](
"count", new Sum(), LongSerializer.INSTANCE
)
/**
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数
*
* @param window 窗信息
* @param ctx 上下文
* @return
*/
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = {
clear(window, ctx)
TriggerResult.FIRE_AND_PURGE
}
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
count.add(1L)
if (count.get >= maxCount || timestamp >= window.getEnd) {
LOGGER.debug(
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())),
count.get.toString,
maxCount.toString,
timestamp.toString,
window.getEnd.toString
)
fireAndPurge(window, ctx)
} else {
TriggerResult.CONTINUE
}
}
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.EventTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) {
TriggerResult.CONTINUE
} else {
LOGGER.debug("Triggered on processing time.")
fireAndPurge(window, ctx)
}
}
}
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) {
TriggerResult.CONTINUE
} else {
LOGGER.debug("Triggered on event time.")
fireAndPurge(window, ctx)
}
}
}
override def clear(window: W, ctx: TriggerContext): Unit = {
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
count.clear()
count.add(0L)
}
/**
* 计数求和
*/
class Sum extends ReduceFunction[java.lang.Long] {
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
}
}
pi.common.typeutils.base.LongSerializer
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.hadoop.hbase.util.Bytes
import org.slf4j.{Logger, LoggerFactory}
/**
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge
*/
class TimeCountTrigger[W <: TimeWindow](
maxCount: Long,
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime
) extends Trigger[Object, W] {
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass)
/**
* 计数状态
*/
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long](
"count", new Sum(), LongSerializer.INSTANCE
)
/**
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数
*
* @param window 窗信息
* @param ctx 上下文
* @return
*/
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = {
clear(window, ctx)
TriggerResult.FIRE_AND_PURGE
}
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
count.add(1L)
if (count.get >= maxCount || timestamp >= window.getEnd) {
LOGGER.debug(
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())),
count.get.toString,
maxCount.toString,
timestamp.toString,
window.getEnd.toString
)
fireAndPurge(window, ctx)
} else {
TriggerResult.CONTINUE
}
}
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.EventTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) {
TriggerResult.CONTINUE
} else {
LOGGER.debug("Triggered on processing time.")
fireAndPurge(window, ctx)
}
}
}
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) {
TriggerResult.CONTINUE
} else {
LOGGER.debug("Triggered on event time.")
fireAndPurge(window, ctx)
}
}
}
override def clear(window: W, ctx: TriggerContext): Unit = {
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
count.clear()
count.add(0L)
}
/**
* 计数求和
*/
class Sum extends ReduceFunction[java.lang.Long] {
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
}
}
) extends Trigger[Object, W] {
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass)
/**
* 计数状态
*/
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long](
"count", new Sum(), LongSerializer.INSTANCE
)
/**
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数
*
* @param window 窗信息
* @param ctx 上下文
* @return
*/
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = {
clear(window, ctx)
TriggerResult.FIRE_AND_PURGE
}
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
count.add(1L)
if (count.get >= maxCount || timestamp >= window.getEnd) {
LOGGER.debug(
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())),
count.get.toString,
maxCount.toString,
timestamp.toString,
window.getEnd.toString
)
fireAndPurge(window, ctx)
} else {
TriggerResult.CONTINUE
}
}
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.EventTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) {
TriggerResult.CONTINUE
} else {
LOGGER.debug("Triggered on processing time.")
fireAndPurge(window, ctx)如果希望
}
}
}
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) {
TriggerResult.CONTINUE
} else {
LOGGER.debug("Triggered on event time.")
fireAndPurge(window, ctx)
}
}
}
override def clear(window: W, ctx: TriggerContext): Unit = {
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
count.clear()
count.add(0L)
}
/**
* 计数求和
*/
class Sum extends ReduceFunction[java.lang.Long] {
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment