Last active
July 7, 2020 08:50
-
-
Save TsingJyujing/dd6fbce9ab756197275cef8f6507c33e to your computer and use it in GitHub Desktop.
Flink持久化踩坑笔记代码
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.collect.Lists; | |
import org.apache.flink.api.common.io.RichOutputFormat; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.hadoop.hbase.TableName; | |
import org.apache.hadoop.hbase.client.Connection; | |
import org.apache.hadoop.hbase.client.ConnectionFactory; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Table; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Map; | |
/** | |
* HBase 数据持久化 | |
* | |
* @author yuanyifan | |
*/ | |
public class HBaseRichOutputFormat extends RichOutputFormat<PutCollection> { | |
private static final Logger LOGGER = LoggerFactory.getLogger(HBaseRichOutputFormat.class); | |
private Connection conn; | |
@Override | |
public void configure(Configuration parameters) { | |
// 初始化连接 | |
try { | |
conn = ConnectionFactory.createConnection( | |
// TODO 这里根据你的需要加载配置 | |
); | |
LOGGER.info("HBase connection established."); | |
} catch (Exception ex) { | |
LOGGER.error("Error while creating HBase connection from configure.", ex); | |
} | |
} | |
@Override | |
public void open(int taskNumber, int numTasks) throws IOException { | |
// 打开连接,由于不涉及数据分片之类的问题,所以这里不需要做什么操作 | |
LOGGER.info("HBaseRichOutputFormat opened which taskNumber={}, numTasks={}", taskNumber, numTasks); | |
} | |
@Override | |
public void writeRecord(PutCollection data) throws IOException { | |
// TODO 批量写入数据,略 | |
} | |
@Override | |
public void close() throws IOException { | |
// 结束的时候记得关闭连接(其实永远不会结束) | |
if (!conn.isClosed()) { | |
conn.close(); | |
} | |
LOGGER.info("HBaseRichOutputFormat closed."); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.collect.Maps; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.Map; | |
/** | |
* 各种Put的集合,包含表信息 | |
*/ | |
public class PutCollection { | |
public HashMap<String, Map<String, SealedPut>> getData() { | |
return data; | |
} | |
public void setData(HashMap<String, Map<String, SealedPut>> data) { | |
this.data = data; | |
} | |
private HashMap<String, Map<String, SealedPut>> data = Maps.newHashMap(); | |
public void putAll(String tableName, Collection<SealedPut> puts) { | |
if (!data.containsKey(tableName)) { | |
data.put(tableName, Maps.newHashMap()); | |
} | |
final Map<String, SealedPut> table = data.get(tableName); | |
for (SealedPut put : puts) { | |
final String hexRow = Bytes.toHex(put.getRowKey()); | |
if (!table.containsKey(hexRow)) { | |
table.put(hexRow, put); | |
} else { | |
table.put( | |
hexRow, | |
table.get(hexRow).appendSealedPut(put) | |
); | |
} | |
} | |
} | |
public void put(String tableName, SealedPut put) { | |
if (!data.containsKey(tableName)) { | |
data.put(tableName, Maps.newHashMap()); | |
} | |
final Map<String, SealedPut> table = data.get(tableName); | |
final String hexRow = Bytes.toHex(put.getRowKey()); | |
if (!table.containsKey(hexRow)) { | |
table.put(hexRow, put); | |
} else { | |
table.put( | |
hexRow, | |
table.get(hexRow).appendSealedPut(put) | |
); | |
} | |
} | |
/** | |
* 合并多个PutCollection | |
* | |
* @param cs | |
* @return | |
*/ | |
public static PutCollection merge(PutCollection... cs) { | |
PutCollection c0 = new PutCollection(); | |
for (PutCollection ci : cs) { | |
for (Map.Entry<String, Map<String, SealedPut>> entry : ci.data.entrySet()) { | |
c0.putAll(entry.getKey(), entry.getValue().values()); | |
} | |
} | |
return c0; | |
} | |
public static PutCollection create(String tableName, Collection<SealedPut> puts) { | |
PutCollection result = new PutCollection(); | |
result.putAll(tableName, puts); | |
return result; | |
} | |
public static PutCollection create(String tableName, SealedPut put) { | |
PutCollection result = new PutCollection(); | |
result.put(tableName, put); | |
return result; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.collect.Maps; | |
import org.apache.hadoop.hbase.Cell; | |
import org.apache.hadoop.hbase.CellUtil; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.SortedMap; | |
/** | |
* 针对Put的一个封装,可以方便的进行序列化防止在某些框架下出现奇怪的错误 | |
* | |
* @author yuanyifan | |
*/ | |
public class SealedPut { | |
private static final Logger LOGGER = LoggerFactory.getLogger(SealedPut.class); | |
public byte[] getRowKey() { | |
return rowKey; | |
} | |
/** | |
* Make `POJO` Happy | |
* | |
* @param rowKey | |
*/ | |
@Deprecated | |
public void setRowKey(byte[] rowKey) { | |
this.rowKey = rowKey; | |
} | |
/** | |
* Make `POJO` Happy | |
* | |
* @param resultData | |
*/ | |
@Deprecated | |
public void setResultData(SortedMap<String, SortedMap<String, byte[]>> resultData) { | |
this.resultData = resultData; | |
} | |
private byte[] rowKey; | |
public SortedMap<String, SortedMap<String, byte[]>> getResultData() { | |
return resultData; | |
} | |
/** | |
* 类型说明: | |
* <p> | |
* Map[ColumnFamily(HexString),Map[Qualifier(HexString),Value(Bytes)]] | |
*/ | |
private SortedMap<String, SortedMap<String, byte[]>> resultData = Maps.newTreeMap(); | |
/** | |
* 创建一个SealedPut | |
* | |
* @param row 行键信息 | |
*/ | |
public SealedPut(byte[] row) { | |
rowKey = row; | |
} | |
/** | |
* 直接通过PUT创建 | |
* | |
* @param hbasePut HBase的PUT | |
*/ | |
public SealedPut(Put hbasePut) { | |
this(hbasePut.getRow()); | |
for (Map.Entry<byte[], List<Cell>> cfData : hbasePut.getFamilyCellMap().entrySet()) { | |
final byte[] columnFamily = cfData.getKey(); | |
for (Cell cell : cfData.getValue()) { | |
put(columnFamily, CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); | |
} | |
} | |
} | |
/** | |
* 将本Put和其他Put加在一起 | |
* | |
* @param v | |
* @return | |
*/ | |
public SealedPut appendSealedPut(SealedPut v) { | |
return merge(this, v); | |
} | |
/** | |
* @return | |
*/ | |
public static SealedPut merge(SealedPut... vs) { | |
if (vs.length <= 1) { | |
throw new RuntimeException("No data need to merge."); | |
} | |
// Check is all the rowkey same | |
final SealedPut v0 = vs[0]; | |
for (SealedPut vi : vs) { | |
if (!Bytes.equals(vi.rowKey, v0.rowKey)) { | |
throw new RuntimeException( | |
String.format( | |
"Can't merge Puts caused by row key not equal (v0=%s, vi=%s).", | |
Bytes.toHex(v0.rowKey), | |
Bytes.toHex(vi.rowKey) | |
) | |
); | |
} | |
} | |
// Merge all put into one | |
final SealedPut newValue = new SealedPut(v0.rowKey); | |
for (SealedPut vi : vs) { | |
for (Map.Entry<String, SortedMap<String, byte[]>> value : vi.resultData.entrySet()) { | |
final byte[] cf = Bytes.fromHex(value.getKey()); | |
for (Map.Entry<String, byte[]> field : value.getValue().entrySet()) { | |
final byte[] qualifier = Bytes.fromHex(field.getKey()); | |
newValue.put(cf, qualifier, field.getValue()); | |
} | |
} | |
} | |
return newValue; | |
} | |
/** | |
* 增加一列 | |
* <p> | |
* 注意增加一列的时候必需按照行键顺序PUT | |
* | |
* @param family 列族信息 | |
* @param qualifier 列标识位 | |
* @param value 数据 | |
*/ | |
public void put(byte[] family, byte[] qualifier, byte[] value) { | |
final String familyHex = Bytes.toHex(family); | |
final String qualifierHex = Bytes.toHex(qualifier); | |
if (!resultData.containsKey(familyHex)) { | |
resultData.put(familyHex, Maps.<String, byte[]>newTreeMap()); | |
} | |
resultData.get(familyHex).put(qualifierHex, value); | |
LOGGER.trace(String.format( | |
"HBase row generated %s: %s -> %s -> %s", | |
Bytes.toHex(rowKey), | |
familyHex, | |
Bytes.toHex(qualifier), | |
Bytes.toHex(value) | |
)); | |
} | |
/** | |
* 转换为HBase可以纳得的Put | |
* | |
* @return | |
*/ | |
public Put toPut() { | |
return convertToPut(this); | |
} | |
/** | |
* 创建函数 | |
* | |
* @param legacyPut | |
* @return | |
*/ | |
public static SealedPut createSealedPut(Put legacyPut) { | |
return new SealedPut(legacyPut); | |
} | |
/** | |
* 将SealedPut转换成HBase使用的Put | |
* | |
* @param sealedPut | |
* @return | |
*/ | |
public static Put convertToPut(SealedPut sealedPut) { | |
Put put = new Put(sealedPut.rowKey); | |
for (Map.Entry<String, SortedMap<String, byte[]>> familyValue : sealedPut.resultData.entrySet()) { | |
for (Map.Entry<String, byte[]> columnValue : familyValue.getValue().entrySet()) { | |
put.addColumn(Bytes.fromHex(familyValue.getKey()), Bytes.fromHex(columnValue.getKey()), columnValue.getValue()); | |
} | |
} | |
return put; | |
} | |
@Override | |
public String toString() { | |
return getStringInHex(); | |
} | |
public String getStringInHex() { | |
final StringBuilder sb = new StringBuilder(String.format( | |
"RowKey:%s\n", Bytes.toHex(getRowKey()) | |
)); | |
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) { | |
sb.append(" FamilyColumn:").append(entry.getKey()).append("\n"); | |
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) { | |
sb.append(" Column: ").append( | |
qv.getKey() | |
).append(" -> ").append( | |
Bytes.toHex(qv.getValue()) | |
).append("\n"); | |
} | |
} | |
return sb.toString(); | |
} | |
/** | |
* For debugging use only | |
* | |
* @return | |
*/ | |
@Deprecated | |
public String getStringInUTF8() { | |
final StringBuilder sb = new StringBuilder(String.format( | |
"RowKey:%s\n", Bytes.toHex(getRowKey()) | |
)); | |
for (Map.Entry<String, SortedMap<String, byte[]>> entry : getResultData().entrySet()) { | |
final String columnFamily = Bytes.toString(Bytes.fromHex(entry.getKey())); | |
sb.append(" FamilyColumn: ").append(columnFamily).append("\n"); | |
for (Map.Entry<String, byte[]> qv : entry.getValue().entrySet()) { | |
final String qualifier = Bytes.toString(Bytes.fromHex(qv.getKey())); | |
sb.append(" Column: ").append( | |
qualifier | |
).append(" -> ").append( | |
Bytes.toHex(qv.getValue()) | |
).append("\n"); | |
} | |
} | |
return sb.toString(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.cvnavi.streaming.trigger | |
import org.apache.flink.api.common.functions.ReduceFunction | |
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} | |
import org.apache.flink.api.common.typeutils.base.LongSerializer | |
import org.apache.flink.streaming.api.TimeCharacteristic | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext | |
import org.apache.flink.streaming.api.windowing.triggers._ | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.slf4j.{Logger, LoggerFactory} | |
/** | |
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge | |
*/ | |
class TimeCountTrigger[W <: TimeWindow]( | |
maxCount: Long, | |
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime | |
package com.cvnavi.streaming.trigger | |
import org.apache.flink.api.common.functions.ReduceFunction | |
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} | |
import org.apache.flink.apackage com.cvnavi.streaming.trigger | |
import org.apache.flink.api.common.functions.ReduceFunction | |
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} | |
import org.apache.flink.api.common.typeutils.base.LongSerializer | |
import org.apache.flink.streaming.api.TimeCharacteristic | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext | |
import org.apache.flink.streaming.api.windowing.triggers._ | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.slf4j.{Logger, LoggerFactory} | |
/** | |
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge | |
*/ | |
class TimeCountTrigger[W <: TimeWindow]( | |
maxCount: Long, | |
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime | |
) extends Trigger[Object, W] { | |
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass) | |
/** | |
* 计数状态 | |
*/ | |
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( | |
"count", new Sum(), LongSerializer.INSTANCE | |
) | |
/** | |
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数 | |
* | |
* @param window 窗信息 | |
* @param ctx 上下文 | |
* @return | |
*/ | |
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = { | |
clear(window, ctx) | |
TriggerResult.FIRE_AND_PURGE | |
} | |
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.add(1L) | |
if (count.get >= maxCount || timestamp >= window.getEnd) { | |
LOGGER.debug( | |
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())), | |
count.get.toString, | |
maxCount.toString, | |
timestamp.toString, | |
window.getEnd.toString | |
) | |
fireAndPurge(window, ctx) | |
} else { | |
TriggerResult.CONTINUE | |
} | |
} | |
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.EventTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on processing time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on event time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def clear(window: W, ctx: TriggerContext): Unit = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.clear() | |
count.add(0L) | |
} | |
/** | |
* 计数求和 | |
*/ | |
class Sum extends ReduceFunction[java.lang.Long] { | |
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2 | |
} | |
} | |
pi.common.typeutils.base.LongSerializer | |
import org.apache.flink.streaming.api.TimeCharacteristic | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext | |
import org.apache.flink.streaming.api.windowing.triggers._ | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.slf4j.{Logger, LoggerFactory} | |
/** | |
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge | |
*/ | |
class TimeCountTrigger[W <: TimeWindow]( | |
maxCount: Long, | |
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime | |
) extends Trigger[Object, W] { | |
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass) | |
/** | |
* 计数状态 | |
*/ | |
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( | |
"count", new Sum(), LongSerializer.INSTANCE | |
) | |
/** | |
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数 | |
* | |
* @param window 窗信息 | |
* @param ctx 上下文 | |
* @return | |
*/ | |
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = { | |
clear(window, ctx) | |
TriggerResult.FIRE_AND_PURGE | |
} | |
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.add(1L) | |
if (count.get >= maxCount || timestamp >= window.getEnd) { | |
LOGGER.debug( | |
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())), | |
count.get.toString, | |
maxCount.toString, | |
timestamp.toString, | |
window.getEnd.toString | |
) | |
fireAndPurge(window, ctx) | |
} else { | |
TriggerResult.CONTINUE | |
} | |
} | |
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.EventTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on processing time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on event time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def clear(window: W, ctx: TriggerContext): Unit = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.clear() | |
count.add(0L) | |
} | |
/** | |
* 计数求和 | |
*/ | |
class Sum extends ReduceFunction[java.lang.Long] { | |
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2 | |
} | |
} | |
) extends Trigger[Object, W] { | |
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass) | |
/** | |
* 计数状态 | |
*/ | |
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( | |
"count", new Sum(), LongSerializer.INSTANCE | |
) | |
/** | |
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数 | |
* | |
* @param window 窗信息 | |
* @param ctx 上下文 | |
* @return | |
*/ | |
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = { | |
clear(window, ctx) | |
TriggerResult.FIRE_AND_PURGE | |
} | |
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.add(1L) | |
if (count.get >= maxCount || timestamp >= window.getEnd) { | |
LOGGER.debug( | |
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())), | |
count.get.toString, | |
maxCount.toString, | |
timestamp.toString, | |
window.getEnd.toString | |
) | |
fireAndPurge(window, ctx) | |
} else { | |
TriggerResult.CONTINUE | |
} | |
} | |
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.EventTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on processing time.") | |
fireAndPurge(window, ctx)如果希望 | |
} | |
} | |
} | |
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on event time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def clear(window: W, ctx: TriggerContext): Unit = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.clear() | |
count.add(0L) | |
} | |
/** | |
* 计数求和 | |
*/ | |
class Sum extends ReduceFunction[java.lang.Long] { | |
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2 | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment