Skip to content

Instantly share code, notes, and snippets.

@geosmart
Last active August 18, 2021 02:00
Show Gist options
  • Save geosmart/64022a2b5e2d027860d308b52c409045 to your computer and use it in GitHub Desktop.
Save geosmart/64022a2b5e2d027860d308b52c409045 to your computer and use it in GitHub Desktop.

Flinkx 1.10.2 数据同步断点续传代码阅读

isRestore逻辑

是否开启断点续传,适用于文件和Jdbc类型的数据源,kafka等流式的不支持

input插件

BaseRichInputFormat

  • BaseRichInputFormat在open时:
    • 如果isRestore=true,初始化formatState的状态数据(numOfSubTask、numberRead、numberWrite、metrics)

BaseDataReader

  • BaseDataReader构造时,
    • 如果isRestore=true,根据restoreColumnName获得MetaColumn,并设置restoreColumnIndex和restoreColumnType

JdbcInputFormat

  • JdbcInputFormat.buildQuerySql时,

    • 如果isRestore=true,根据${restoreFilter}构建恢复sql,或根据${restoreFilter}构建增量查询sql
  • JdbcInputFormat.closeInternal时,

    • 如果isRestore=true,如果task是非正常结束(state!=running),设置commit=false不提交事物

output插件

BaseRichOutputFormat

  • BaseRichOutputFormat.writeSingleRecord()时,
    • 如果isRestore=false || isStreamButNoWriteCheckpointnumWriteCountersnapshotWriteCounter的状态+1
  • BaseRichOutputFormat.writeMultipleRecords()时,
    • 如果isRestore=false,numWriteCounter的状态+rows.size
  • BaseRichOutputFormat.writeMultipleRecords()失败时,
    • 如果isRestore=true,直接抛出异常
    • 如果isRestore=false,遍历rows执行writeSingleRecord()

BaseFileOutputFormat

  • BaseFileOutputFormat.actionBeforeWriteData()

    • 如果restoreConfig.isRestore() && formatState != null,将执行cleanDirtyData()清除脏数据文件
  • BaseFileOutputFormat.writeSingleRecordInternal()时,如果isRestore=true,isStream=false

    • 如果currentRow[restoreColumnIndex]==lastRow[restoreColumnIndex],则readyCheckpoint=true
  • BaseFileOutputFormat.getFormatState()获取当前通道的recover point时,

    • 如果isRestore=false || lastRow==null时直接返回
    • 如果isRestore=true || !lastRow时,如果isStream=true||readyCheckpoint=true时,执行flushData()持久化数据,并更新状态数据
    • flushData时,如果isRestore=true,执行moveTemporaryDataBlockFileToDirectory()移动临时数据文件
  • BaseFileOutputFormat.nextBlock()时,如果isRestore=true

    • currentBlockFileName前缀附加.
  • BaseFileOutputFormat.closeInternal()时,

    1. 如果task是非正常结束(state!=running),如果isRestore=false,执行clearTemporaryDataFiles清除临时数据文件;
    2. 如果task是正常结束(state==running),执行flushData(),如果isRestore=false,主动执行执行moveTemporaryDataBlockFileToDirectory()移动临时数据
  • BaseFileOutputFormat.tryCleanupOnError()时,

    • 如果isRestore=false,执行clearTemporaryDataFiles清除临时数据文件;

BaseHdfsOutputFormat子类

  • orc/parquetOutputFormat.writeSingleRecordToFile()时,
    • 如果如果isRestore=true,设置lastRow=row

JdbcOutputFormat

  • JdbcOutputFormat.writeMultipleRecordsInternal()时,
    • 如果如果isRestore=true,如果lastRow!=null,设置readyCheckpoint,设置lastRow=row

isStream逻辑

是否开启适时传输:用于控制写入数据的时机

bool isStream= (isRestore=true && isStream=true)

input插件

BaseDataReader

BaseDataReader构造时,

  • 如果isStream=true,直接返回,否则判断如果isRestore=true则根据restoreColumnName获得MetaColumn,并设置restoreColumnIndex和restoreColumnType

output插件

BaseFileOutputFormat

  • BaseFileOutputFormat.writeSingleRecordInternal()时,如果isRestore=true,isStream=false

    • 如果currentRow[restoreColumnIndex]==lastRow[restoreColumnIndex],则readyCheckpoint=true
  • BaseFileOutputFormat.getFormatState()获取当前通道的recover point时,

    • 如果isStream=true||readyCheckpoint时,执行flushData()持久化数据,并更新状态数据
  • BaseFileOutputFormat.afterCloseInternal

    • 如果isStream=true,直接关闭数据源;
    • 如果isStream=falsewaitForAllTasksToFinish()等待所有通道操作完成,moveAllTemporaryDataFileToDirectory移动临时数据文件,关闭数据源,执行clearTemporaryDataFiles清除临时数据文件;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment