Skip to content

Instantly share code, notes, and snippets.

@kmikmy
Last active January 15, 2016 05:04
Show Gist options
  • Save kmikmy/cbf14902cd0bdd38b850 to your computer and use it in GitHub Desktop.
Save kmikmy/cbf14902cd0bdd38b850 to your computer and use it in GitHub Desktop.

XLogFlush(XLogRecPtr record)

概要

与えられた位置(record)までのXLOGデータがディスクにフラッシュされることを確実にする。

返り値

void

引数

XLogRecPtr record

XLogRecPtrの定義

/*
 * Pointer to a location in the XLOG.  These pointers are 64 bits wide,
 * because we don't want them ever to overflow.
 */
typedef uint64 XLogRecPtr;

XLogRecPtrはXLOGの位置を指す。 これはおそらくWALファイル上の位置(バッファ上の位置ではなく)を表すと思われる。

内容

redoの最中であればWALのflushを行わない

REDOの間、書き込んでないWALを読んでいる。

/*
 * Ensure that all XLOG data through the given position is flushed to disk.
 *
 * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
 * already held, and we try to avoid acquiring it if possible.
 */
void
XLogFlush(XLogRecPtr record)
{
    XLogRecPtr  WriteRqstPtr;
    XLogwrtRqst WriteRqst;

    /*
     * During REDO, we are reading not writing WAL.  Therefore, instead of
     * trying to flush the WAL, we should update minRecoveryPoint instead. We
     * test XLogInsertAllowed(), not InRecovery, because we need checkpointer
     * to act this way too, and because when it tries to write the
     * end-of-recovery checkpoint, it should indeed flush.
     */
    if (!XLogInsertAllowed())
    {
        UpdateMinRecoveryPoint(record, false);
        return;
    }

LogwrtResultが既にrecordの値までFlushしていれば即終了する。

XLogwrtResult型

変数LogwrtResultはプライベートコピー。 古くなっている可能性があるのでまた後で最新のデータをチェックする必要がある。

/*
 * Private, possibly out-of-date copy of shared LogwrtResult.
 * See discussion above.
 */
static XLogwrtResult LogwrtResult = {0, 0};
    /* Quick exit if already known flushed */
    if (record <= LogwrtResult.Flush)
        return;

#ifdef WAL_DEBUG
    if (XLOG_DEBUG)
        elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
             (uint32) (record >> 32), (uint32) record,
             (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
           (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
#endif

ローカルなWriteRqstPtrの初期化

START_CRIT_SECTION()自体はクリティカルセクションに入っている数をカウントアップするだけで、ロックのたぐいは取っていない。 最終的にLogwrtResult.Flushは可能な限り大きくなるように、writeとfsyncを行う。

ローカルなWriteRqstPtrを初めは引数のrecord位置にセットする。

    START_CRIT_SECTION();

    /*
     * Since fsync is usually a horribly expensive operation, we try to
     * piggyback as much data as we can on each fsync: if we see any more data
     * entered into the xlog buffer, we'll write and fsync that too, so that
     * the final value of LogwrtResult.Flush is as large as possible. This
     * gives us some chance of avoiding another fsync immediately after.
     */

    /* initialize to given target; may increase below */
    WriteRqstPtr = record;

write lockを手に入れるまで待っているか、他の誰かがフラッシュをしている

ローカルなxlogctlは、volatile修飾子をつけて必ずメモリを見に行くようにする。 XLogCtlData* を触る際は、かならず構造体のinfo_lckのロックを取る。 リクエスト用のwriteポインタが更新されていれば、ローカルなWriteRqstPtrも更新する 既にrecord(必要最低分の位置)まで、Flush済みであれば何もせずにbreakして終了する.


    /*
     * Now wait until we get the write lock, or someone else does the flush
     * for us.
     */
    for (;;)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
        XLogRecPtr  insertpos;

        /* read LogwrtResult and update local state */
        SpinLockAcquire(&xlogctl->info_lck);
        if (WriteRqstPtr < xlogctl->LogwrtRqst.Write)
            WriteRqstPtr = xlogctl->LogwrtRqst.Write;
        LogwrtResult = xlogctl->LogwrtResult;
        SpinLockRelease(&xlogctl->info_lck);

        /* done already? */
        if (record <= LogwrtResult.Flush)
            break;

実行中のインサートを待つ

実際に書き込む前に、私たちが今まさにに書こうとしているpages(XLogCtl->pages)への全ての挿入を待つ

        /*
         * Before actually performing the write, wait for all in-flight
         * insertions to the pages we're about to write to finish.
         */
        insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);

WALWriteLockを取る

WALWriteLockの定義

共有メモリのLWLockをさすマクロになっている

#define WALWriteLock                (&MainLWLockArray[8].lock)

WALWriteLockがすぐに取れない時(他が取っていた時)、 LWLockAcquireOrWait()はロックが開放されるまで待ってfalseを返す。 注意すべきはこの時ロックは取得されていないということである。 このセマンティクスは他のバックエンドがログをフラッシュしている可能性があるので、 再度調べ直すためにcontinueでfor文の先頭からやり直すことを意味している。

        /*
         * Try to get the write lock. If we can't get it immediately, wait
         * until it's released, and recheck if we still need to do the flush
         * or if the backend that held the lock did it for us already. This
         * helps to maintain a good rate of group committing when the system
         * is bottlenecked by the speed of fsyncing.
         */
        if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))
        {
            /*
             * The lock is now free, but we didn't acquire it yet. Before we
             * do, loop back to check if someone else flushed the record for
             * us already.
             */
            continue;
        }

WALWriteLockを取った時点で、リクエストが満たされているかを再チェックする

すでにrecordの位置以上までFlushされていたら、breakして終了する。

        /* Got the lock; recheck whether request is satisfied */
        LogwrtResult = XLogCtl->LogwrtResult;
        if (record <= LogwrtResult.Flush)
        {
            LWLockRelease(WALWriteLock);
            break;
        }

flushの前にsleepすることで、グループコミットのバックログに参加する機会をバックエンドに更に与える。

CommitDelayが設定されていて、(enableFsyncがtrueで)、アクティブなトランザザクションがCommitSiblings以上ある場合、 sleepする。

        /*
         * Sleep before flush! By adding a delay here, we may give further
         * backends the opportunity to join the backlog of group commit
         * followers; this can significantly improve transaction throughput,
         * at the risk of increasing transaction latency.
         *
         * We do not sleep if enableFsync is not turned on, nor if there are
         * fewer than CommitSiblings other backends with active transactions.
         */
        if (CommitDelay > 0 && enableFsync &&
            MinimumActiveBackends(CommitSiblings))
        {
            pg_usleep(CommitDelay);

            /*
             * Re-check how far we can now flush the WAL. It's generally not
             * safe to call WaitXLogInsertionsToFinish while holding
             * WALWriteLock, because an in-progress insertion might need to
             * also grab WALWriteLock to make progress. But we know that all
             * the insertions up to insertpos have already finished, because
             * that's what the earlier WaitXLogInsertionsToFinish() returned.
             * We're only calling it again to allow insertpos to be moved
             * further forward, not to actually wait for anyone.
             */
            insertpos = WaitXLogInsertionsToFinish(insertpos);
        }

最新のXLOGまでwrite/flushする

最後にWALWriteLockを開放して終了

        /* try to write/flush later additions to XLOG as well */
        WriteRqst.Write = insertpos;
        WriteRqst.Flush = insertpos;

        XLogWrite(WriteRqst, false);

        LWLockRelease(WALWriteLock);
        /* done */
        break;
    }
    
    END_CRIT_SECTION();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment