Flinkx 1.10.2 数据同步断点续传代码阅读
是否开启断点续传,适用于文件和Jdbc类型的数据源,kafka等流式的不支持
- BaseRichInputFormat在
open
时:- 如果isRestore=true,初始化formatState的状态数据(numOfSubTask、numberRead、numberWrite、metrics)
- BaseDataReader构造时,
- 如果
isRestore=true
,根据restoreColumnName获得MetaColumn,并设置restoreColumnIndex和restoreColumnType
- 如果
-
JdbcInputFormat.
buildQuerySql
时,- 如果
isRestore=true
,根据${restoreFilter}
构建恢复sql,或根据${restoreFilter}
构建增量查询sql
- 如果
-
JdbcInputFormat.
closeInternal
时,- 如果isRestore=true,如果task是非正常结束(state!=running),设置
commit=false
不提交事物
- 如果isRestore=true,如果task是非正常结束(state!=running),设置
- BaseRichOutputFormat.
writeSingleRecord()
时,- 如果
isRestore=false || isStreamButNoWriteCheckpoint
,numWriteCounter
和snapshotWriteCounter
的状态+1
- 如果
- BaseRichOutputFormat.
writeMultipleRecords()
时,- 如果isRestore=false,
numWriteCounter
的状态+rows.size
- 如果isRestore=false,
- BaseRichOutputFormat.
writeMultipleRecords()
失败时,- 如果isRestore=true,直接抛出异常
- 如果isRestore=false,遍历rows执行
writeSingleRecord()
-
BaseFileOutputFormat.
actionBeforeWriteData()
时- 如果
restoreConfig.isRestore() && formatState != null
,将执行cleanDirtyData()
清除脏数据文件
- 如果
-
BaseFileOutputFormat.
writeSingleRecordInternal()
时,如果isRestore=true,isStream=false
- 如果
currentRow[restoreColumnIndex]==lastRow[restoreColumnIndex]
,则readyCheckpoint=true
- 如果
-
BaseFileOutputFormat.
getFormatState()
获取当前通道的recover point
时,- 如果
isRestore=false || lastRow==null
时直接返回 - 如果
isRestore=true || !lastRow
时,如果isStream=true||readyCheckpoint=true
时,执行flushData()
持久化数据,并更新状态数据 flushData
时,如果isRestore=true
,执行moveTemporaryDataBlockFileToDirectory()
移动临时数据文件
- 如果
-
BaseFileOutputFormat.
nextBlock()
时,如果isRestore=true
,- currentBlockFileName前缀附加
.
号
- currentBlockFileName前缀附加
-
BaseFileOutputFormat.
closeInternal()
时,- 如果task是非正常结束(state!=running),如果
isRestore=false
,执行clearTemporaryDataFiles
清除临时数据文件; - 如果task是正常结束(state==running),执行
flushData()
,如果isRestore=false
,主动执行执行moveTemporaryDataBlockFileToDirectory()
移动临时数据
- 如果task是非正常结束(state!=running),如果
-
BaseFileOutputFormat.
tryCleanupOnError()
时,- 如果
isRestore=false
,执行clearTemporaryDataFiles
清除临时数据文件;
- 如果
- orc/parquetOutputFormat.
writeSingleRecordToFile()
时,- 如果如果isRestore=true,设置lastRow=row
- JdbcOutputFormat.
writeMultipleRecordsInternal()
时,- 如果如果isRestore=true,如果
lastRow!=null
,设置readyCheckpoint
,设置lastRow=row
- 如果如果isRestore=true,如果
是否开启适时传输:用于控制写入数据的时机
bool isStream= (isRestore=true && isStream=true)
BaseDataReader构造时,
- 如果
isStream=true
,直接返回,否则判断如果isRestore=true
则根据restoreColumnName获得MetaColumn,并设置restoreColumnIndex和restoreColumnType
-
BaseFileOutputFormat.
writeSingleRecordInternal()
时,如果isRestore=true,isStream=false
- 如果
currentRow[restoreColumnIndex]==lastRow[restoreColumnIndex]
,则readyCheckpoint=true
- 如果
-
BaseFileOutputFormat.
getFormatState()
获取当前通道的recover point
时,- 如果
isStream=true||readyCheckpoint
时,执行flushData()
持久化数据,并更新状态数据
- 如果
-
BaseFileOutputFormat.
afterCloseInternal
时- 如果
isStream=true
,直接关闭数据源; - 如果
isStream=false
,waitForAllTasksToFinish()
等待所有通道操作完成,moveAllTemporaryDataFileToDirectory
移动临时数据文件,关闭数据源,执行clearTemporaryDataFiles
清除临时数据文件;
- 如果