Skip to content

Instantly share code, notes, and snippets.

@kmikmy
Last active February 10, 2016 01:01
Show Gist options
  • Save kmikmy/55459a4cb1e77bdc1efc to your computer and use it in GitHub Desktop.
Save kmikmy/55459a4cb1e77bdc1efc to your computer and use it in GitHub Desktop.
コードで読むポストグレスのリカバリ

コードで読むポストグレスのリカバリ

RedoRecPtr

最も最近のチェックポイントレコードを指すポインタ。 バックエンドプロセス用のローカルコピーが

static XLogRecPtr RedoRecPtr; 

で宣言されており、 インサートロックを保持している時に共有メモリの同様のポインタ(XLogCtl->Insert.RedoRecPtr)から、 最新の情報を更新する。 xLogCtl->RedoRecPtrは、info_lckを持っている時に更新が許される。

CheckPoint レコード

CheckPoint XLOGレコードのボディ。 障害回復を可能にするためにpg_controlの中に最新のコピーを保持する。

typedef struct CheckPoint
{
    XLogRecPtr  redo;           /* next RecPtr available when we began to
                                 * create CheckPoint (i.e. REDO start point) */
    TimeLineID  ThisTimeLineID; /* current TLI */
    TimeLineID  PrevTimeLineID; /* previous TLI, if this record begins a new
                                 * timeline (equals ThisTimeLineID otherwise) */
    bool        fullPageWrites; /* current full_page_writes */
    uint32      nextXidEpoch;   /* higher-order bits of nextXid */
    TransactionId nextXid;      /* next free XID */
    Oid         nextOid;        /* next free OID */
    MultiXactId nextMulti;      /* next free MultiXactId */
    MultiXactOffset nextMultiOffset;    /* next free MultiXact offset */
    TransactionId oldestXid;    /* cluster-wide minimum datfrozenxid */
    Oid         oldestXidDB;    /* database with minimum datfrozenxid */
    MultiXactId oldestMulti;    /* cluster-wide minimum datminmxid */
    Oid         oldestMultiDB;  /* database with minimum datminmxid */
    pg_time_t   time;           /* time stamp of checkpoint */

    /*
     * Oldest XID still running. This is only needed to initialize hot standby
     * mode from an online checkpoint, so we only bother calculating this for
     * online checkpoints and only when wal_level is hot_standby. Otherwise
     * it's set to InvalidTransactionId.
     */
    TransactionId oldestActiveXid;
} CheckPoint;

recoveryのコード

void StartupXLOG(void)

起動時に一度だけ実行されるスタートアップ関数。 この関数の中で、redoリカバリが行われる。

ReadControlFile()でControlFileにpg_controlが読み込まれていることを確実にする.

/*
 * This must be called ONCE during postmaster or standalone-backend startup
 */
void
StartupXLOG(void)
{
    XLogCtlInsert *Insert;
    CheckPoint  checkPoint;
    bool        wasShutdown;
    bool        reachedStopPoint = false;
    bool        haveBackupLabel = false;
    XLogRecPtr  RecPtr,
                checkPointLoc,
                EndOfLog;
    XLogSegNo   endLogSegNo;
    TimeLineID  PrevTimeLineID;
    XLogRecord *record;
    TransactionId oldestActiveXID;
    bool        backupEndRequired = false;
    bool        backupFromStandby = false;
    DBState     dbstate_at_startup;
    XLogReaderState *xlogreader;
    XLogPageReadPrivate private;
    bool        fast_promoted = false;

    /*
     * Read control file and check XLOG status looks valid.
     *
     * Note: in most control paths, *ControlFile is already valid and we need
     * not do ReadControlFile() here, but might as well do it to be sure.
     */
    ReadControlFile();

pg_control

以下の構造体で表現される。 パーシャルライトが起こらないように512Byte以内に収めるようにしている。 (ダブルライトなどの面倒を起こさないようにするため)

/*
 * Contents of pg_control.
 *
 * NOTE: try to keep this under 512 bytes so that it will fit on one physical
 * sector of typical disk drives.  This reduces the odds of corruption due to
 * power failure midway through a write.
 */

typedef struct ControlFileData
{
    /*
     * Unique system identifier --- to ensure we match up xlog files with the
     * installation that produced them.
     */
    uint64      system_identifier;

    /*
     * Version identifier information.  Keep these fields at the same offset,
     * especially pg_control_version; they won't be real useful if they move
     * around.  (For historical reasons they must be 8 bytes into the file
     * rather than immediately at the front.)
     *
     * pg_control_version identifies the format of pg_control itself.
     * catalog_version_no identifies the format of the system catalogs.
     *
     * There are additional version identifiers in individual files; for
     * example, WAL logs contain per-page magic numbers that can serve as
     * version cues for the WAL log.
     */
    uint32      pg_control_version;     /* PG_CONTROL_VERSION */
    uint32      catalog_version_no;     /* see catversion.h */

    /*
     * System status data
     */
    DBState     state;          /* see enum above */
    pg_time_t   time;           /* time stamp of last pg_control update */
    XLogRecPtr  checkPoint;     /* last check point record ptr */
    XLogRecPtr  prevCheckPoint; /* previous check point record ptr */

    CheckPoint  checkPointCopy; /* copy of last check point record */

    XLogRecPtr  unloggedLSN;    /* current fake LSN value, for unlogged rels */

    /*
     * These two values determine the minimum point we must recover up to
     * before starting up:
     *
     * minRecoveryPoint is updated to the latest replayed LSN whenever we
     * flush a data change during archive recovery. That guards against
     * starting archive recovery, aborting it, and restarting with an earlier
     * stop location. If we've already flushed data changes from WAL record X
     * to disk, we mustn't start up until we reach X again. Zero when not
     * doing archive recovery.
     *
     * backupStartPoint is the redo pointer of the backup start checkpoint, if
     * we are recovering from an online backup and haven't reached the end of
     * backup yet. It is reset to zero when the end of backup is reached, and
     * we mustn't start up before that. A boolean would suffice otherwise, but
     * we use the redo pointer as a cross-check when we see an end-of-backup
     * record, to make sure the end-of-backup record corresponds the base
     * backup we're recovering from.
     *
     * backupEndPoint is the backup end location, if we are recovering from an
     * online backup which was taken from the standby and haven't reached the
     * end of backup yet. It is initialized to the minimum recovery point in
     * pg_control which was backed up last. It is reset to zero when the end
     * of backup is reached, and we mustn't start up before that.
     *
     * If backupEndRequired is true, we know for sure that we're restoring
     * from a backup, and must see a backup-end record before we can safely
     * start up. If it's false, but backupStartPoint is set, a backup_label
     * file was found at startup but it may have been a leftover from a stray
     * pg_start_backup() call, not accompanied by pg_stop_backup().
     */
    XLogRecPtr  minRecoveryPoint;
    TimeLineID  minRecoveryPointTLI;
    XLogRecPtr  backupStartPoint;
    XLogRecPtr  backupEndPoint;
    bool        backupEndRequired;

    /*
     * Parameter settings that determine if the WAL can be used for archival
     * or hot standby.
     */
    int         wal_level;
    bool        wal_log_hints;
    int         MaxConnections;
    int         max_worker_processes;
    int         max_prepared_xacts;
    int         max_locks_per_xact;

    /*
     * This data is used to check for hardware-architecture compatibility of
     * the database and the backend executable.  We need not check endianness
     * explicitly, since the pg_control version will surely look wrong to a
     * machine of different endianness, but we do need to worry about MAXALIGN
     * and floating-point format.  (Note: storage layout nominally also
     * depends on SHORTALIGN and INTALIGN, but in practice these are the same
     * on all architectures of interest.)
     *
     * Testing just one double value is not a very bulletproof test for
     * floating-point compatibility, but it will catch most cases.
     */
    uint32      maxAlign;       /* alignment requirement for tuples */
    double      floatFormat;    /* constant 1234567.0 */
#define FLOATFORMAT_VALUE   1234567.0

    /*
     * This data is used to make sure that configuration of this database is
     * compatible with the backend executable.
     */
    uint32      blcksz;         /* data block size for this DB */
    uint32      relseg_size;    /* blocks per segment of large relation */

    uint32      xlog_blcksz;    /* block size within WAL files */
    uint32      xlog_seg_size;  /* size of each WAL segment */

    uint32      nameDataLen;    /* catalog name field width */
    uint32      indexMaxKeys;   /* max number of columns in an index */

    uint32      toast_max_chunk_size;   /* chunk size in TOAST tables */
    uint32      loblksize;      /* chunk size in pg_largeobject */

    /* flag indicating internal format of timestamp, interval, time */
    bool        enableIntTimes; /* int64 storage enabled? */

    /* flags indicating pass-by-value status of various types */
    bool        float4ByVal;    /* float4 pass-by-value? */
    bool        float8ByVal;    /* float8, int8, etc pass-by-value? */

    /* Are data pages protected by checksums? Zero if no checksum version */
    uint32      data_checksum_version;

    /* CRC of all above ... MUST BE LAST! */
    pg_crc32    crc;
} ControlFileData;     

stateフィールドはDBState列挙体である

/*
 * System status indicator.  Note this is stored in pg_control; if you change
 * it, you must bump PG_CONTROL_VERSION
 */
typedef enum DBState
{
    DB_STARTUP = 0,
    DB_SHUTDOWNED, // 正常にシャットダウンした
    DB_SHUTDOWNED_IN_RECOVERY, // リカバリ中に落ちた
    DB_SHUTDOWNING, // シャットダウン作業をしている時に落ちた
    DB_IN_CRASH_RECOVERY, // クラッシュリカバリ中に落ちた
    DB_IN_ARCHIVE_RECOVERY, // アーカイブリカバリ(PITR)してる間に落ちた
    DB_IN_PRODUCTION // 通常の処理をしている間に落ちた
} DBState;                                                                                      

recovery(続き)

ValidXLOGDirectoryStructure()でpg_xlogとpg_xlog/archive_statusが存在するかを確かめる。

RelationCacheInitFileRemove()でpg_internal.initファイルが削除される(これによりPITRリカバリ時に誤ったフリースペース情報が 発生する問題が修正されるらしい)

前にクラッシュが起きた場合(正常にシャットダウンされなかった場合)、 データディレクトリ全体をfsyncする。

   /*
     * Verify that pg_xlog and pg_xlog/archive_status exist.  In cases where
     * someone has performed a copy for PITR, these directories may have been
     * excluded and need to be re-created.
     */
    ValidateXLOGDirectoryStructure();

    /*
     * Clear out any old relcache cache files.  This is *necessary* if we do
     * any WAL replay, since that would probably result in the cache files
     * being out of sync with database reality.  In theory we could leave them
     * in place if the database had been cleanly shut down, but it seems
     * safest to just remove them always and let them be rebuilt during the
     * first backend startup.
     */
    RelationCacheInitFileRemove();

    /*
     * If we previously crashed, there might be data which we had written,
     * intending to fsync it, but which we had not actually fsync'd yet.
     * Therefore, a power failure in the near future might cause earlier
     * unflushed writes to be lost, even though more recent data written to
     * disk from here on would be persisted.  To avoid that, fsync the entire
     * data directory.
     */
    if (ControlFile->state != DB_SHUTDOWNED &&
        ControlFile->state != DB_SHUTDOWNED_IN_RECOVERY)
        SyncDataDirectory();

minRecoveryPointTLIとチェックポイントレコードのTLIを比較して大きい方を採用する

readRecoveryCommandFile()でrecovery.confファイルがあるかチェックして、あるなら、アーカイブリカバリとXLOGストーリーミングのためのパラメータを読んでパラメータをセットする。

   /*
     * Initialize on the assumption we want to recover to the latest timeline
     * that's active according to pg_control.
     */
    if (ControlFile->minRecoveryPointTLI >
        ControlFile->checkPointCopy.ThisTimeLineID)
        recoveryTargetTLI = ControlFile->minRecoveryPointTLI;
    else
        recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;

    /*
     * Check for recovery control file, and if so set up state for offline
     * recovery
     */
    readRecoveryCommandFile();

アーカイブリカバリがリクエストされた場合、ターゲットとなるリカバリポイントをLOGする。

    if (ArchiveRecoveryRequested)
    {
        if (StandbyModeRequested)
            ereport(LOG,
                    (errmsg("entering standby mode")));
        else if (recoveryTarget == RECOVERY_TARGET_XID)
            ereport(LOG,
                    (errmsg("starting point-in-time recovery to XID %u",
                            recoveryTargetXid)));
        else if (recoveryTarget == RECOVERY_TARGET_TIME)
            ereport(LOG,
                    (errmsg("starting point-in-time recovery to %s",
                            timestamptz_to_str(recoveryTargetTime))));
        else if (recoveryTarget == RECOVERY_TARGET_NAME)
            ereport(LOG,
                    (errmsg("starting point-in-time recovery to \"%s\"",
                            recoveryTargetName)));
        else if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE)
            ereport(LOG,
                    (errmsg("starting point-in-time recovery to earliest consistent point")));
        else
            ereport(LOG,
                    (errmsg("starting archive recovery")));
    }

wakeup latchが何なのか不明

    /*
     * Take ownership of the wakeup latch if we're going to sleep during
     * recovery.
     */
    if (StandbyModeRequested)
        OwnLatch(&XLogCtl->recoveryWakeupLatch);

XLOGリーダーのセットアップを行う。

変数privateはXLogPageReadPrivate型。

typedef struct XLogPageReadPrivate
{
    int         emode;
    bool        fetching_ckpt;  /* are we fetching a checkpoint record? */
    bool        randAccess;
} XLogPageReadPrivate;

privateはcallbackのための不透明なデータ 関数ポインタXLogPageReadCB型の、XLogPageRead()をReaderに登録して割り当てて返す。

    /* Set up XLOG reader facility */
    MemSet(&private, 0, sizeof(XLogPageReadPrivate));
    xlogreader = XLogReaderAllocate(&XLogPageRead, &private);
    if (!xlogreader)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of memory"),
           errdetail("Failed while allocating an XLog reading processor.")));
    xlogreader->system_identifier = ControlFile->system_identifier;

XLogPageReadCB

コールバック関数へのポインタ

typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
                                           XLogRecPtr targetPagePtr,
                                           int reqLen,
                                           XLogRecPtr targetRecPtr,
                                           char *readBuf,
                                           TimeLineID *pageTLI);

XLogPageReadCB型の関数は、 targetPagePtrから始まるxlogのページの少なくともreqLenバイトを読んで、それをreadBufに入れる。 このコールバックは読み込んだバイト数(絶対にXLOG_BLOCKSZを超えることはない)か、失敗時の-1を返す。

XLogPageRead

static int
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
             XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
{
    XLogPageReadPrivate *private =
    (XLogPageReadPrivate *) xlogreader->private_data;
    int         emode = private->emode;
    uint32      targetPageOff;
    XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;

    XLByteToSeg(targetPagePtr, targetSegNo);
    targetPageOff = targetPagePtr % XLogSegSize;

    /*
     * See if we need to switch to a new segment because the requested record
     * is not in the currently open one.
     */
    if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo))
    {
        /*
         * Request a restartpoint if we've replayed too much xlog since the
         * last one.
         */
        if (StandbyModeRequested && bgwriterLaunched)
        {
            if (XLogCheckpointNeeded(readSegNo))
            {
                (void) GetRedoRecPtr();
                if (XLogCheckpointNeeded(readSegNo))
                    RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
            }
        }

        close(readFile);
        readFile = -1;
        readSource = 0;
    }

    XLByteToSeg(targetPagePtr, readSegNo);

retry:
    /* See if we need to retrieve more data */
    if (readFile < 0 ||
        (readSource == XLOG_FROM_STREAM &&
         receivedUpto < targetPagePtr + reqLen))
    {
        if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
                                         private->randAccess,
                                         private->fetching_ckpt,
                                         targetRecPtr))
        {
            if (readFile >= 0)
                close(readFile);
            readFile = -1;
            readLen = 0;
            readSource = 0;

            return -1;
        }
    }

    /*
     * At this point, we have the right segment open and if we're streaming we
     * know the requested record is in it.
     */
    Assert(readFile != -1);

    /*
     * If the current segment is being streamed from master, calculate how
     * much of the current page we have received already. We know the
     * requested record has been received, but this is for the benefit of
     * future calls, to allow quick exit at the top of this function.
     */
    if (readSource == XLOG_FROM_STREAM)
    {
        if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
            readLen = XLOG_BLCKSZ;
        else
            readLen = receivedUpto % XLogSegSize - targetPageOff;
    }
    else
        readLen = XLOG_BLCKSZ;

    /* Read the requested page */
    readOff = targetPageOff;
    if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
    {
        char        fname[MAXFNAMELEN];

        XLogFileName(fname, curFileTLI, readSegNo);
        ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                (errcode_for_file_access(),
                 errmsg("could not seek in log segment %s to offset %u: %m",
                        fname, readOff)));
        goto next_record_is_invalid;
    }

    if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
    {
        char        fname[MAXFNAMELEN];

        XLogFileName(fname, curFileTLI, readSegNo);
        ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                (errcode_for_file_access(),
                 errmsg("could not read from log segment %s, offset %u: %m",
                        fname, readOff)));
        goto next_record_is_invalid;
    }

    Assert(targetSegNo == readSegNo);
    Assert(targetPageOff == readOff);
    Assert(reqLen <= readLen);

    *readTLI = curFileTLI;
    return readLen;

next_record_is_invalid:
    lastSourceFailed = true;

    if (readFile >= 0)
        close(readFile);
    readFile = -1;
    readLen = 0;
    readSource = 0;

    /* In standby-mode, keep trying */
    if (StandbyMode)
        goto retry;
    else
        return -1;
}

XLogReaderState

readBuf(XLOG_BLCKSZサイズ)に現在読んでいるWAL pageが入っていて、 readRecordBufに現在読んでいるWAL recordが入っている。

struct XLogReaderState
{
    XLogPageReadCB read_page;

    /*
     * System identifier of the xlog files we're about to read.  Set to zero
     * (the default value) if unknown or unimportant.
     */
    uint64      system_identifier;

    /*
     * Opaque data for callbacks to use.  Not used by XLogReader.
     */
    void       *private_data;

    /*
     * Start and end point of last record read.  EndRecPtr is also used as the
     * position to read next, if XLogReadRecord receives an invalid recptr.
     */
    XLogRecPtr  ReadRecPtr;     /* start of last record read */
    XLogRecPtr  EndRecPtr;      /* end+1 of last record read */

    /* ----------------------------------------
     * private/internal state
     * ----------------------------------------
     
    /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
    char       *readBuf;

    /* last read segment, segment offset, read length, TLI */
    XLogSegNo   readSegNo;
    uint32      readOff;
    uint32      readLen;
    TimeLineID  readPageTLI;

    /* beginning of last page read, and its TLI  */
    XLogRecPtr  latestPagePtr;
    TimeLineID  latestPageTLI;

    /* beginning of the WAL record being read. */
    XLogRecPtr  currRecPtr;

    /* Buffer for current ReadRecord result (expandable) */
    char       *readRecordBuf;
    uint32      readRecordBufSize;

    /* Buffer to hold error message */
    char       *errormsg_buf;
};

リカバリ中にbackup_labelを見つけた場合、私たちはバックアップのダンプファイルからリカバリすることを前提として、したがって そのラベルファイルによって特定されるチェックポイントからロルフォワードする。そうでなければ(backup_labelが見つからない場合は)pg_controlが言っているチェックポイントからロールフォワードする。

backup_labelは、pg_start_backup()が発行された時にdata/backup_labelに作成される。

pg_controlはダンプの開始から後に一つ以上のチェックポイントが取られてきた可能性があるので、 スタートポイントとしてpg_controlを頼ると、一貫性のアルデータベースの状態を復元するために失敗する可能性があるという問題を避ける。

backup_labeが見つかった場合はTRUE、ない場合はfalseを返す。

backup_labelがストーリームされたバックアップからきている場合はbackupEndRequiredがTRUEに、 backup_labelがリカバリ中に作られれた場合はbackupFromStandbyはTRUEにセットされる。

backup_labelがあるおかげで、整合性のある地点に到達するためにどこまでリプレイする必要があるかを知っている。 ので、直接アーカイブリカバリに入る。

   if (read_backup_label(&checkPointLoc, &backupEndRequired,
                          &backupFromStandby))
    {
        /*
         * Archive recovery was requested, and thanks to the backup label
         * file, we know how far we need to replay to reach consistency. Enter
         * archive recovery directly.
         */
        InArchiveRecovery = true;
        if (StandbyModeRequested)
            StandbyMode = true;

backup_labelファイルが有るなら、それが示すチェックポイントからロールフォワードする (pg_controlのチェックポイントを使うのではなく)

レコードがNULLなら、チェックポイントレコードが読めなかったということでフェイタルエラー。 レコードがある場合、XLogRecDataGetData(record)でXLogRecPtrからXLogの中身を、checkPointにコピーする。

PITRがリクエストされたということなので、前回に正常終了された場合でもリカバリをするフラグ(InRecovery)をtrueに。

オンラインバックアップが、既にアーカイブされたWALセグメントを参照するbackup_labelが残っていて、 クラッシュした場合ではない可能性がある?(よくわからない) (少なくとも、自分の開始地点より後がredoポイントでなければならないというのは納得する。)

バックアップラベルには、pg_start_backup関数が呼ばれれた時のWAL位置、とチェックポイントレコードのWAL位置(LSN)が書かれている。

        /*
         * When a backup_label file is present, we want to roll forward from
         * the checkpoint it identifies, rather than using pg_control.
         */
        record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
        if (record != NULL)
        {
            memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
            wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
            ereport(DEBUG1,
                    (errmsg("checkpoint record is at %X/%X",
                   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
            InRecovery = true;  /* force recovery even if SHUTDOWNED */

            /*
             * Make sure that REDO location exists. This may not be the case
             * if there was a crash during an online backup, which left a
             * backup_label around that references a WAL segment that's
             * already been archived.
             */
            if (checkPoint.redo < checkPointLoc)
            {
                if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
                    ereport(FATAL,
                            (errmsg("could not find redo location referenced by checkpoint record"),
                             errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
            }
        }

バックアップラベルがあった場合はフラグを立てておいて、後で消すようにする。

        /* set flag to delete it later */
        haveBackupLabel = true;
    }

以降バックアップラベルがなかった場合。 バックアップラベルがないと、整合性のある状態に到達する前にどれくらいWALを再生する必要があるのか分からない(?) この場合の戦略はクラッシュリカバリの後に、アーカイブリカバリに入る。(よく分からない)

    else
    {
        /*
         * It's possible that archive recovery was requested, but we don't
         * know how far we need to replay the WAL before we reach consistency.
         * This can happen for example if a base backup is taken from a
         * running server using an atomic filesystem snapshot, without calling
         * pg_start/stop_backup. Or if you just kill a running master server
         * and put it into archive recovery by creating a recovery.conf file.
         *
         * Our strategy in that case is to perform crash recovery first,
         * replaying all the WAL present in pg_xlog, and only enter archive
         * recovery after that.
         *
         * But usually we already know how far we need to replay the WAL (up
         * to minRecoveryPoint, up to backupEndPoint, or until we see an
         * end-of-backup record), and we can enter archive recovery directly.
         */
        if (ArchiveRecoveryRequested &&
            (ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
             ControlFile->backupEndRequired ||
             ControlFile->backupEndPoint != InvalidXLogRecPtr ||
             ControlFile->state == DB_SHUTDOWNED))
        {
            InArchiveRecovery = true;
            if (StandbyModeRequested)
                StandbyMode = true;
        }

最後の正しいチェックポイントレコードを獲得する。 pg_controlで参照される最新のものが壊れていたら、次のものを試す。

ホットスタンバイモードについてはcheckpointレコードがないとPANICエラーにするらしい。

チェックポイントレコードをmemcpyで読み込む。 wasShutdownはそのチェックポイントがシャットダウン時のものかどうかを調べるフラグのよう。

        /*
         * Get the last valid checkpoint record.  If the latest one according
         * to pg_control is broken, try the next-to-last one.
         */
        checkPointLoc = ControlFile->checkPoint;
        RedoStartLSN = ControlFile->checkPointCopy.redo;
        record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
        if (record != NULL)
        {
            ereport(DEBUG1,
                    (errmsg("checkpoint record is at %X/%X",
                   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
        }
        else if (StandbyMode)
        {
            /*
             * The last valid checkpoint record required for a streaming
             * recovery exists in neither standby nor the primary.
             */
            ereport(PANIC,
                    (errmsg("could not locate a valid checkpoint record")));
        }
        else
        {
            checkPointLoc = ControlFile->prevCheckPoint;
            record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2, true);
            if (record != NULL)
            {
                ereport(LOG,
                        (errmsg("using previous checkpoint record at %X/%X",
                   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
                InRecovery = true;      /* force recovery even if SHUTDOWNED */
            }
            else
                ereport(PANIC,
                     (errmsg("could not locate a valid checkpoint record")));
        }
        memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
        wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
    }

チェックポイントレコードが、リクエストされたタイムラインの歴史内の期待されたタイムラインでない場合、進むことは出来ない。

    /*
     * If the location of the checkpoint record is not on the expected
     * timeline in the history of the requested timeline, we cannot proceed:
     * the backup is not part of the history of the requested timeline.
     */
    Assert(expectedTLEs);       /* was initialized by reading checkpoint
                                 * record */
    if (tliOfPointInHistory(checkPointLoc, expectedTLEs) !=
        checkPoint.ThisTimeLineID)
    {
        XLogRecPtr  switchpoint;

        /*
         * tliSwitchPoint will throw an error if the checkpoint's timeline is
         * not in expectedTLEs at all.
         */
        switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL);
        ereport(FATAL,
                (errmsg("requested timeline %u is not a child of this server's history",
                        recoveryTargetTLI),
                 errdetail("Latest checkpoint is at %X/%X on timeline %u, but in the history of the requested timeline, the server forked off from that timeline at %X/%X.",
                           (uint32) (ControlFile->checkPoint >> 32),
                           (uint32) ControlFile->checkPoint,
                           ControlFile->checkPointCopy.ThisTimeLineID,
                           (uint32) (switchpoint >> 32),
                           (uint32) switchpoint)));
    }

チェックポイントレコードの値がおかしくないかをひたすらチェックして、共有メモリを初期化する。

   /*
     * The min recovery point should be part of the requested timeline's
     * history, too.
     */ if (!XLogRecPtrIsInvalid(ControlFile->minRecoveryPoint) &&
      tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) !=
        ControlFile->minRecoveryPointTLI)
        ereport(FATAL,
                (errmsg("requested timeline %u does not contain minimum recovery point %X/%X on timeline %u",
                        recoveryTargetTLI,
                        (uint32) (ControlFile->minRecoveryPoint >> 32),
                        (uint32) ControlFile->minRecoveryPoint,
                        ControlFile->minRecoveryPointTLI)));

    LastRec = RecPtr = checkPointLoc;

    ereport(DEBUG1,
            (errmsg("redo record is at %X/%X; shutdown %s",
                  (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
                    wasShutdown ? "TRUE" : "FALSE")));
    ereport(DEBUG1,
            (errmsg("next transaction ID: %u/%u; next OID: %u",
                    checkPoint.nextXidEpoch, checkPoint.nextXid,
                    checkPoint.nextOid)));
    ereport(DEBUG1,
            (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
                    checkPoint.nextMulti, checkPoint.nextMultiOffset)));
    ereport(DEBUG1,
            (errmsg("oldest unfrozen transaction ID: %u, in database %u",
                    checkPoint.oldestXid, checkPoint.oldestXidDB)));
    ereport(DEBUG1,
            (errmsg("oldest MultiXactId: %u, in database %u",
                    checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
    if (!TransactionIdIsNormal(checkPoint.nextXid))
        ereport(PANIC,
                (errmsg("invalid next transaction ID")));
    /* initialize shared memory variables from the checkpoint record */
    ShmemVariableCache->nextXid = checkPoint.nextXid;
    ShmemVariableCache->nextOid = checkPoint.nextOid;
    ShmemVariableCache->oidCount = 0;
    MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
    SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
    SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
    MultiXactSetSafeTruncate(checkPoint.oldestMulti);
    XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
    XLogCtl->ckptXid = checkPoint.nextXid;

諸々の初期化

   /*
     * Initialize replication slots, before there's a chance to remove
     * required resources.
     */
    StartupReplicationSlots();

    /*
     * Startup logical state, needs to be setup now so we have proper data
     * during crash recovery.
     */
    StartupReorderBuffer();

    /*
     * Startup MultiXact.  We need to do this early for two reasons: one is
     * that we might try to access multixacts when we do tuple freezing, and
     * the other is we need its state initialized because we attempt
     * truncation during restartpoints.
     */
    StartupMultiXact();

ログされないリレーション(テーブル)が作れるらしく、 フェイクとなるLSNのカウントを初期化している。

    /*
     * Initialize unlogged LSN. On a clean shutdown, it's restored from the
     * control file. On recovery, all unlogged relations are blown away, so
     * the unlogged LSN counter can be reset too.
     */
    if (ControlFile->state == DB_SHUTDOWNED)
        XLogCtl->unloggedLSN = ControlFile->unloggedLSN;
    else
        XLogCtl->unloggedLSN = 1;

restoreTimeLineHistoryFiles()は現在のタイムラインからリカバリのターゲットするタイムラインまでのWALを アーカイブからpg_xlogにコピーする。 fullPageWritesフラグや、RedoRecPtrをチェックポイントレコードから復元する。

    /*
     * We must replay WAL entries using the same TimeLineID they were created
     * under, so temporarily adopt the TLI indicated by the checkpoint (see
     * also xlog_redo()).
     */
    ThisTimeLineID = checkPoint.ThisTimeLineID;
   /*
     * Copy any missing timeline history files between 'now' and the recovery
     * target timeline from archive to pg_xlog. While we don't need those
     * files ourselves - the history file of the recovery target timeline
     * covers all the previous timelines in the history too - a cascading
     * standby server might be interested in them. Or, if you archive the WAL
     * from this server to a different archive than the master, it'd be good
     * for all the history files to get archived there after failover, so that
     * you can use one of the old timelines as a PITR target. Timeline history
     * files are small, so it's better to copy them unnecessarily than not
     * copy them and regret later.
     */
    restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI);

    lastFullPageWrites = checkPoint.fullPageWrites;

    RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;

RecPtrはチェックポイントレコードそのものの位置。のよう(だがおかしい) checkPoint.redoはチェックポイントに書かれたredoの開始地点。 実際のレコードより、後にredo位置がきてると不正なレコードらしい。 (同じならよい?)

    if (RecPtr < checkPoint.redo)
        ereport(PANIC,
                (errmsg("invalid redo in checkpoint record")));

WALから強制的にリカバリする必要があるかをチェックする。 正常にシャットダウンされて、recovery.confファイルがない場合、 リカバリは不要だと推定される。

今度は逆にチェックポイント記載位置が、チェックポイントよりもひくいのでダメ。

    /*
     * Check whether we need to force recovery from WAL.  If it appears to
     * have been a clean shutdown and we did not have a recovery.conf file,
     * then assume no recovery needed.
     */
    if (checkPoint.redo < RecPtr)
    {
        if (wasShutdown)
            ereport(PANIC,
                    (errmsg("invalid redo record in shutdown checkpoint")));
        InRecovery = true;
    }
    else if (ControlFile->state != DB_SHUTDOWNED)
        InRecovery = true;
    else if (ArchiveRecoveryRequested)
    {
        /* force recovery due to presence of recovery.conf */
        InRecovery = true;
    }

REDO

リカバリが必要なら以下のブロックに入る。 ControlFileに前のチェックポイント位置や、チェックポイント位置、状態(DB_IN_ARCHIVE_RECOVERY, DB_IN_CRASH_RECOVERY)などを入れる。 アーカイブリカバリならcheckPointレコードから、pg_controlファイルを更新する。

    /* REDO */
    if (InRecovery)
    {
        int         rmid;

        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;

        /*
         * Update pg_control to show that we are recovering and to show the
         * selected checkpoint as the place we are starting from. We also mark
         * pg_control with any minimum recovery stop point obtained from a
         * backup history file.
         */
        dbstate_at_startup = ControlFile->state;
        if (InArchiveRecovery)
            ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
        else
        {
            ereport(LOG,
                    (errmsg("database system was not properly shut down; "
                            "automatic recovery in progress")));
            if (recoveryTargetTLI > ControlFile->checkPointCopy.ThisTimeLineID)
                ereport(LOG,
                        (errmsg("crash recovery starts in timeline %u "
                                "and has target timeline %u",
                                ControlFile->checkPointCopy.ThisTimeLineID,
                                recoveryTargetTLI)));
            ControlFile->state = DB_IN_CRASH_RECOVERY;
        }
        ControlFile->prevCheckPoint = ControlFile->checkPoint;
        ControlFile->checkPoint = checkPointLoc;
        ControlFile->checkPointCopy = checkPoint;
        if (InArchiveRecovery)
        {
            /* initialize minRecoveryPoint if not set yet */
            if (ControlFile->minRecoveryPoint < checkPoint.redo)
            {
                ControlFile->minRecoveryPoint = checkPoint.redo;
                ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
            }
        }

ベースバックアップからリカバリを開始するバックアップの開始地点をセットする。

ま、スタンバイから取られたベースバックアップからリカバリを開始るる場合、 backupEndPointをセットするのに、minRecoveryPointをバックアップの終わりの場所として使う。

(backupEndRequiredの意味は、どこで終わるかは後で求めることが必要とされるということだと思う)

       /*
         * Set backupStartPoint if we're starting recovery from a base backup.
         *
         * Also set backupEndPoint and use minRecoveryPoint as the backup end
         * location if we're starting recovery from a base backup which was
         * taken from a standby. In this case, the database system status in
         * pg_control must indicate that the database was already in
         * recovery. Usually that will be DB_IN_ARCHIVE_RECOVERY but also can
         * be DB_SHUTDOWNED_IN_RECOVERY if recovery previously was interrupted
         * before reaching this point; e.g. because restore_command or
         * primary_conninfo were faulty.
         *
         * Any other state indicates that the backup somehow became corrupted
         * and we can't sensibly continue with recovery.
         */
        if (haveBackupLabel)
        {
            ControlFile->backupStartPoint = checkPoint.redo;
            ControlFile->backupEndRequired = backupEndRequired;

            if (backupFromStandby)
            {
                if (dbstate_at_startup != DB_IN_ARCHIVE_RECOVERY &&
                    dbstate_at_startup != DB_SHUTDOWNED_IN_RECOVERY)
                    ereport(FATAL,
                            (errmsg("backup_label contains data inconsistent with control file"),
                             errhint("This means that the backup is corrupted and you will "
                               "have to use another backup for recovery.")));
                ControlFile->backupEndPoint = ControlFile->minRecoveryPoint;
            }
        }

時間の情報を更新して、UpdateControlFile()で更新したpg_controlファイルを書き込む。

        ControlFile->time = (pg_time_t) time(NULL);
        /* No need to hold ControlFileLock yet, we aren't up far enough */
        UpdateControlFile();

minRecoveryPointのローカルコピーを初期化する。

pgstatデータはリカバリ後は不正かもしれないのでリセットする。

        /* initialize our local copy of minRecoveryPoint */
        minRecoveryPoint = ControlFile->minRecoveryPoint;
        minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;

        /*
         * Reset pgstat data, because it may be invalid after recovery.
         */
        pgstat_reset_all();

バックアップラベルファイルがある場合、その仕事と情報は今やpg_controlに引き継がれたので、 くリカバリ中にクラッシュした場合、全バックアップの開始地点へ戻る道を行くのではなく、最新のリカバリの開始地点を取得するように、ラベルファイルを削除しなければならない。ファイルを完璧に削除するのではなく、その方法から外れるように、ファイルの名前を変えるという慎重な思慮の有る方法を取っている。

        /*
         * If there was a backup label file, it's done its job and the info
         * has now been propagated into pg_control.  We must get rid of the
         * label file so that if we crash during recovery, we'll pick up at
         * the latest recovery restartpoint instead of going all the way back
         * to the backup start point.  It seems prudent though to just rename
         * the file out of the way rather than delete it completely.
         */
        if (haveBackupLabel)
        {
            unlink(BACKUP_LABEL_OLD);
            if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) != 0)
                ereport(FATAL,
                        (errcode_for_file_access(),
                         errmsg("could not rename file \"%s\" to \"%s\": %m",
                                BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
        }

GUC(Grand Unified Configuration)はPostgreSQLの動的に変更できるパラメータを管理するモジュールとのこと http://ikubo.x0.com/PostgreSQL/hack/pg_shikumi9_20050314_memo.htm

        /* Check that the GUCs used to generate the WAL allow recovery */
        CheckRequiredParameterValues();

ログされないリレーションやスナップショットは無かったことにされる。

        /*
         * We're in recovery, so unlogged relations may be trashed and must be
         * reset.  This should be done BEFORE allowing Hot Standby
         * connections, so that read-only backends don't try to read whatever
         * garbage is left over from before.
         */
        ResetUnloggedRelations(UNLOGGED_RELATION_CLEANUP);

        /*
         * Likewise, delete any saved transaction snapshot files that got left
         * behind by crashed backends.
         */
        DeleteAllExportedSnapshotFiles();

ホットスタンバイモード時の挙動はここでは追わない。

       /*
         * Initialize for Hot Standby, if enabled. We won't let backends in
         * yet, not until we've reached the min recovery point specified in
         * control file and we've established a recovery snapshot from a
         * running-xacts WAL record.
         */
        if (ArchiveRecoveryRequested && EnableHotStandby)
        {
            TransactionId *xids;
            int         nxids;

            ereport(DEBUG1,
                    (errmsg("initializing for hot standby")));

            InitRecoveryTransactionEnvironment();

            if (wasShutdown)
                oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
            else
                oldestActiveXID = checkPoint.oldestActiveXid;
            Assert(TransactionIdIsValid(oldestActiveXID));

            /* Tell procarray about the range of xids it has to deal with */
            ProcArrayInitRecovery(ShmemVariableCache->nextXid);

            /*
             * Startup commit log and subtrans only. MultiXact has already
             * been started up and other SLRUs are not maintained during
             * recovery and need not be started yet.
             */
            StartupCLOG();
            StartupSUBTRANS(oldestActiveXID);

            /*
             * If we're beginning at a shutdown checkpoint, we know that
             * nothing was running on the master at this point. So fake-up an
             * empty running-xacts record and use that here and now. Recover
             * additional standby state for prepared transactions.
             */
            if (wasShutdown)
            {
                RunningTransactionsData running;
                TransactionId latestCompletedXid;

                /*
                 * Construct a RunningTransactions snapshot representing a
                 * shut down server, with only prepared transactions still
                 * alive. We're never overflowed at this point because all
                 * subxids are listed with their parent prepared transactions.
                 */
                running.xcnt = nxids;
                running.subxcnt = 0;
                running.subxid_overflow = false;
                running.nextXid = checkPoint.nextXid;
                running.oldestRunningXid = oldestActiveXID;
                latestCompletedXid = checkPoint.nextXid;
                TransactionIdRetreat(latestCompletedXid);
                Assert(TransactionIdIsNormal(latestCompletedXid));
                running.latestCompletedXid = latestCompletedXid;
                running.xids = xids;

                ProcArrayApplyRecoveryInfo(&running);

                StandbyRecoverPreparedTransactions(false);
            }
        }

リソースマネージャーの初期化

       /* Initialize resource managers */
        for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
        {
            if (RmgrTable[rmid].rm_startup != NULL)
                RmgrTable[rmid].rm_startup();
        }

PG_RMGRマクロは構造体の初期化子

#define PG_RMGR(symname,name,redo,desc,startup,cleanup) \
    { name, redo, desc, startup, cleanup },
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
};

RmgrData型には関数ポインタを設定

/*
 * Method table for resource managers.
 *
 * This struct must be kept in sync with the PG_RMGR definition in
 * rmgr.c.
 *
 * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h).
 */
typedef struct RmgrData
{
    const char *rm_name;
    void        (*rm_redo) (XLogRecPtr lsn, struct XLogRecord *rptr);
    void        (*rm_desc) (StringInfo buf, uint8 xl_info, char *rec);
    void        (*rm_startup) (void);
    void        (*rm_cleanup) (void);
} RmgrData;

rmgrlist.hで各リソースマネージャーの登録を行っている。

/* symbol name, textual name, redo, desc, startup, cleanup */
PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, NULL, NULL)
PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, NULL, NULL)
PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, NULL, NULL)
PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, NULL, NULL)
PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, NULL, NULL)
PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, NULL, NULL)
PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, NULL, NULL)
PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, NULL, NULL)
PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, NULL, NULL)
PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, NULL, NULL)
PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, NULL, NULL)
PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, NULL, NULL)
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, NULL, NULL)
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup)
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup)
PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, NULL, NULL)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_xlog_startup, spg_xlog_cleanup)

WALのリプレイを追跡するための共有変数を初期化する、まるでちょうどREDO位置(あるいはシャットダウンのチェックポイントなら、チェックポイントレコード自身)の前でレコードを再生してきたかのように。

        /*
         * Initialize shared variables for tracking progress of WAL replay, as
         * if we had just replayed the record before the REDO location (or the
         * checkpoint record itself, if it's a shutdown checkpoint).
         */
        SpinLockAcquire(&xlogctl->info_lck);
        if (checkPoint.redo < RecPtr)
            xlogctl->replayEndRecPtr = checkPoint.redo;
        else
            xlogctl->replayEndRecPtr = EndRecPtr;
        xlogctl->replayEndTLI = ThisTimeLineID;
        xlogctl->lastReplayedEndRecPtr = xlogctl->replayEndRecPtr;
        xlogctl->lastReplayedTLI = xlogctl->replayEndTLI;
        xlogctl->recoveryLastXTime = 0;
        xlogctl->currentChunkStartTime = 0;
        xlogctl->recoveryPause = false;
        SpinLockRelease(&xlogctl->info_lck);

        /* Also ensure XLogReceiptTime has a sane value */
        XLogReceiptTime = GetCurrentTimestamp();

リスタートポイント処理を実行するようにチェックポインターを起動するように、postmasterにredoを開始したことを教える。 リスタートポイントはアーカイブリカバリ中にのみ実行されるのでクラッシュリカバリ中は気にしない。

「アーカイブからのリカバリもしくはスタンバイモードにおいて、サーバでは定期的に通常運用でのチェックポイント処理と似たリスタートポイント処理を行います。これは、すでに再生されたWALを再度読み込む必要がないよう、ディスクに現在の状態を強制的に書き込み、pg_controlファイルを更新します。」とのこと。 https://www.postgresql.jp/document/9.2/html/wal-configuration.html

要するに、リカバリの途中でクラッシュしたときに、また一からリカバリしなくて済むようにという話。

        /*
         * Let postmaster know we've started redo now, so that it can launch
         * checkpointer to perform restartpoints.  We don't bother during
         * crash recovery as restartpoints can only be performed during
         * archive recovery.  And we'd like to keep crash recovery simple, to
         * avoid introducing bugs that could affect you when recovering after
         * crash.
         *
         * After this point, we can no longer assume that we're the only
         * process in addition to postmaster!  Also, fsync requests are
         * subsequently to be handled by the checkpointer, not locally.
         */
        if (ArchiveRecoveryRequested && IsUnderPostmaster)
        {
            PublishStartupProcessInformation();
            SetForwardFsyncRequests();
            SendPostmasterSignal(PMSIGNAL_RECOVERY_STARTED);
            bgwriterLaunched = true;
        }

既整合性が取れているなら、すぐにread-onlyの接続を許す

        /*
         * Allow read-only connections immediately if we're consistent
         * already.
         */
        CheckRecoveryConsistency();

論理的にチェックポイントの後の始めのレコードを見つける --- 物理的にはチェックポイントに先行しているかもしれないけど

ReadRecord()の第二引数にLSN(LogRecPtr)を渡すが、InvalidXLogRecPtrが指定された場合は、xlogreaderが最後に読んだ続きから読んでいる。 redo位置がRecPtrより小さい時は、レコードを探すために後退する。

RecPtr=checkPointLocation(=ControlFile->checkPointなど)

        /*
         * Find the first record that logically follows the checkpoint --- it
         * might physically precede it, though.
         */
        if (checkPoint.redo < RecPtr)
        {
            /* back up to find the record */
            record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
        }
        else
        {
            /* just have to read next record after CheckPoint */
            record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
        }

void xlog_redo(XLogRecPtr lsn, XLogRecord *record)

XLOGリソースマネージャーのrm_redo()関数(XLOGの時はこのxlog_redo()がreplay時に呼ばれる) recordには既に読み込んだログレコードが入っている。

lsnはログの終端のよう?(先頭かもしれないので再度チェック)

int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)

ログをreadBufに読み込む関数.

static int
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
             XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
{
    XLogPageReadPrivate *private =
    (XLogPageReadPrivate *) xlogreader->private_data;
    int         emode = private->emode;
    uint32      targetPageOff;
    XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;

    XLByteToSeg(targetPagePtr, targetSegNo);
    targetPageOff = targetPagePtr % XLogSegSize;

    /*
     * See if we need to switch to a new segment because the requested record
     * is not in the currently open one.
     */
    if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo))
    {
        /*
         * Request a restartpoint if we've replayed too much xlog since the
         * last one.
         */
        if (StandbyModeRequested && bgwriterLaunched)
        {
            if (XLogCheckpointNeeded(readSegNo))
            {
                (void) GetRedoRecPtr();
                if (XLogCheckpointNeeded(readSegNo))
                    RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
            }
        }

        close(readFile);
        readFile = -1;
        readSource = 0;
    
    }

    XLByteToSeg(targetPagePtr, readSegNo);

retry:
    /* See if we need to retrieve more data */
    if (readFile < 0 ||
        (readSource == XLOG_FROM_STREAM &&
         receivedUpto < targetPagePtr + reqLen))
    {
        if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
                                         private->randAccess,
                                         private->fetching_ckpt,
                                         targetRecPtr))
        {
            if (readFile >= 0)
                close(readFile);
            readFile = -1;
            readLen = 0;
            readSource = 0;

            return -1;
        }
    }

    /*
     * At this point, we have the right segment open and if we're streaming we
     * know the requested record is in it.
     */
    Assert(readFile != -1);

    /*
     * If the current segment is being streamed from master, calculate how
     * much of the current page we have received already. We know the
     * requested record has been received, but this is for the benefit of
     * future calls, to allow quick exit at the top of this function.
     */
    if (readSource == XLOG_FROM_STREAM)
    {
        if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
            readLen = XLOG_BLCKSZ;
        else
            readLen = receivedUpto % XLogSegSize - targetPageOff;
    }
    else
        readLen = XLOG_BLCKSZ;

    /* Read the requested page */
    readOff = targetPageOff;
    if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
    {
        char        fname[MAXFNAMELEN];

        XLogFileName(fname, curFileTLI, readSegNo);
        ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                (errcode_for_file_access(),
                 errmsg("could not seek in log segment %s to offset %u: %m",
                        fname, readOff)));
        goto next_record_is_invalid;
    }

    if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
    {
        char        fname[MAXFNAMELEN];

        XLogFileName(fname, curFileTLI, readSegNo);
        ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                (errcode_for_file_access(),
                 errmsg("could not read from log segment %s, offset %u: %m",
                        fname, readOff)));
        goto next_record_is_invalid;
    }

    Assert(targetSegNo == readSegNo);
    Assert(targetPageOff == readOff);
    Assert(reqLen <= readLen);

    *readTLI = curFileTLI;
    return readLen;

next_record_is_invalid:
    lastSourceFailed = true;

    if (readFile >= 0)
        close(readFile);
    readFile = -1;
    readLen = 0;
    readSource = 0;

    /* In standby-mode, keep trying */
    if (StandbyMode)
        goto retry;
    else
        return -1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment