Created
April 10, 2019 02:17
-
-
Save zhaoawd/a11f96976bcbf1fe2c0c98a72a2bc486 to your computer and use it in GitHub Desktop.
[TimeCountTrigger] 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge #flink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.ReduceFunction | |
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} | |
import org.apache.flink.api.common.typeutils.base.LongSerializer | |
import org.apache.flink.streaming.api.TimeCharacteristic | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext | |
import org.apache.flink.streaming.api.windowing.triggers._ | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.slf4j.{Logger, LoggerFactory} | |
/** | |
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge | |
*/ | |
class TimeCountTrigger[W <: TimeWindow]( | |
maxCount: Long, | |
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime | |
package com.cvnavi.streaming.trigger | |
import org.apache.flink.api.common.functions.ReduceFunction | |
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} | |
import org.apache.flink.apackage com.cvnavi.streaming.trigger | |
import org.apache.flink.api.common.functions.ReduceFunction | |
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} | |
import org.apache.flink.api.common.typeutils.base.LongSerializer | |
import org.apache.flink.streaming.api.TimeCharacteristic | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext | |
import org.apache.flink.streaming.api.windowing.triggers._ | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.slf4j.{Logger, LoggerFactory} | |
/** | |
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge | |
*/ | |
class TimeCountTrigger[W <: TimeWindow]( | |
maxCount: Long, | |
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime | |
) extends Trigger[Object, W] { | |
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass) | |
/** | |
* 计数状态 | |
*/ | |
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( | |
"count", new Sum(), LongSerializer.INSTANCE | |
) | |
/** | |
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数 | |
* | |
* @param window 窗信息 | |
* @param ctx 上下文 | |
* @return | |
*/ | |
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = { | |
clear(window, ctx) | |
TriggerResult.FIRE_AND_PURGE | |
} | |
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.add(1L) | |
if (count.get >= maxCount || timestamp >= window.getEnd) { | |
LOGGER.debug( | |
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())), | |
count.get.toString, | |
maxCount.toString, | |
timestamp.toString, | |
window.getEnd.toString | |
) | |
fireAndPurge(window, ctx) | |
} else { | |
TriggerResult.CONTINUE | |
} | |
} | |
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.EventTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on processing time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on event time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def clear(window: W, ctx: TriggerContext): Unit = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.clear() | |
count.add(0L) | |
} | |
/** | |
* 计数求和 | |
*/ | |
class Sum extends ReduceFunction[java.lang.Long] { | |
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2 | |
} | |
} | |
pi.common.typeutils.base.LongSerializer | |
import org.apache.flink.streaming.api.TimeCharacteristic | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext | |
import org.apache.flink.streaming.api.windowing.triggers._ | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.slf4j.{Logger, LoggerFactory} | |
/** | |
* 针对时间窗做处理,在超出窗体的时间或者个数极限的时候触发Fire&Purge | |
*/ | |
class TimeCountTrigger[W <: TimeWindow]( | |
maxCount: Long, | |
timeCharacteristic: TimeCharacteristic = TimeCharacteristic.ProcessingTime | |
) extends Trigger[Object, W] { | |
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass) | |
/** | |
* 计数状态 | |
*/ | |
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( | |
"count", new Sum(), LongSerializer.INSTANCE | |
) | |
/** | |
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数 | |
* | |
* @param window 窗信息 | |
* @param ctx 上下文 | |
* @return | |
*/ | |
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = { | |
clear(window, ctx) | |
TriggerResult.FIRE_AND_PURGE | |
} | |
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.add(1L) | |
if (count.get >= maxCount || timestamp >= window.getEnd) { | |
LOGGER.debug( | |
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())), | |
count.get.toString, | |
maxCount.toString, | |
timestamp.toString, | |
window.getEnd.toString | |
) | |
fireAndPurge(window, ctx) | |
} else { | |
TriggerResult.CONTINUE | |
} | |
} | |
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.EventTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on processing time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on event time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def clear(window: W, ctx: TriggerContext): Unit = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.clear() | |
count.add(0L) | |
} | |
/** | |
* 计数求和 | |
*/ | |
class Sum extends ReduceFunction[java.lang.Long] { | |
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2 | |
} | |
} | |
) extends Trigger[Object, W] { | |
protected val LOGGER: Logger = LoggerFactory.getLogger(getClass) | |
/** | |
* 计数状态 | |
*/ | |
private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( | |
"count", new Sum(), LongSerializer.INSTANCE | |
) | |
/** | |
* 这里绝逼是Flink里的坑,在返回`TriggerResult.FIRE_AND_PURGE`的时候不会清空计数 | |
* | |
* @param window 窗信息 | |
* @param ctx 上下文 | |
* @return | |
*/ | |
private def fireAndPurge(window: W, ctx: TriggerContext): TriggerResult = { | |
clear(window, ctx) | |
TriggerResult.FIRE_AND_PURGE | |
} | |
override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.add(1L) | |
if (count.get >= maxCount || timestamp >= window.getEnd) { | |
LOGGER.debug( | |
"Triggered on element while count:{}/{} time:{}/{}." + Bytes.toHex(Bytes.toBytes(hashCode())), | |
count.get.toString, | |
maxCount.toString, | |
timestamp.toString, | |
window.getEnd.toString | |
) | |
fireAndPurge(window, ctx) | |
} else { | |
TriggerResult.CONTINUE | |
} | |
} | |
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.EventTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on processing time.") | |
fireAndPurge(window, ctx)如果希望 | |
} | |
} | |
} | |
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = { | |
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) { | |
TriggerResult.CONTINUE | |
} else { | |
if (time >= window.getEnd) { | |
TriggerResult.CONTINUE | |
} else { | |
LOGGER.debug("Triggered on event time.") | |
fireAndPurge(window, ctx) | |
} | |
} | |
} | |
override def clear(window: W, ctx: TriggerContext): Unit = { | |
val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState) | |
count.clear() | |
count.add(0L) | |
} | |
/** | |
* 计数求和 | |
*/ | |
class Sum extends ReduceFunction[java.lang.Long] { | |
def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2 | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment