Skip to content

Instantly share code, notes, and snippets.

@kmikmy
Last active January 15, 2016 04:38
Show Gist options
  • Save kmikmy/1b3083e40be16c264f72 to your computer and use it in GitHub Desktop.
Save kmikmy/1b3083e40be16c264f72 to your computer and use it in GitHub Desktop.

XLogWrite(XLogwrtRqst WriteRqst, bool flexible)

返り値

void

引数

少なくともWriteRqstが示す限りのログをWrite and/or fsyncする。 flexible == TRUEならば、必ずWriteRqstまで書く必要はなく、 便利な境界線でも止めてもよい(キャッシュやログファイルの境界)

内容

/*
 * Write and/or fsync the log at least as far as WriteRqst indicates.
 *
 * If flexible == TRUE, we don't have to write as far as WriteRqst, but
 * may stop at any convenient boundary (such as a cache or logfile boundary).
 * This option allows us to avoid uselessly issuing multiple writes when a
 * single one would do.
 *
 * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
 * must be called before grabbing the lock, to make sure the data is ready to
 * write.
 */
static void
XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
    bool        ispartialpage;
    bool        last_iteration;
    bool        finishing_seg;
    bool        use_existent;
    int         curridx;
    int         npages;
    int         startidx;
    uint32      startoffset;

    /* We should always be inside a critical section here */
    Assert(CritSectionCount > 0);

どこまでログを書いたかの結果をローカル変数に更新する。

    /*
     * Update local LogwrtResult (caller probably did this already, but...)
     */
    LogwrtResult = XLogCtl->LogwrtResult;

xlogキャッシュ内の連続したページは、連続して割り当てられるので、 私たちはたいてい複数のページを集めて、一回のwrite()コールを発行する。 npageは私達が決定した一緒に書かれるページの数である;

startidx: 初めのキャッシュブロックのインデックス。 startoffset: キャッシュブロックが書かれるファイルのオフセット。 これらの変数はnpages > 0の時に有効であるが、コンパイラをだまらせるために 全て初期化しなければならない。

    /*
     * Since successive pages in the xlog cache are consecutively allocated,
     * we can usually gather multiple pages together and issue just one
     * write() call.  npages is the number of pages we have determined can be
     * written together; startidx is the cache block index of the first one,
     * and startoffset is the file offset at which it should go. The latter
     * two variables are only valid when npages > 0, but we must initialize
     * all of them to keep the compiler quiet.
     */
    npages = 0;
    startidx = 0;
    startoffset = 0;

curridxは書き込むと考えられるページのキャッシュブロックのインデックス。 これは次のまだ書かれていないページか、最後に部分的に書かれたページ(partial page)から始める。

    /*
     * Within the loop, curridx is the cache block index of the page to
     * consider writing.  Begin at the buffer containing the next unwritten
     * page, or last partially written page.
     */
    curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);

次のwhile内のコードは ログ書き込み結果(LogwrtResult.Write)が書き込みリクエスト(WriteRqst.Write)に達するまで 繰り返される。

(これはセグメントを跨る場合などに、writeやfsyncが2回呼ばれるためだと思われる)

XLogCtl->xlblocks[curridx];はそのキャッシュブロックの終端の 境界のバイトを表している。

    while (LogwrtResult.Write < WriteRqst.Write)
    {
        /*
         * Make sure we're not ahead of the insert process.  This could happen
         * if we're passed a bogus WriteRqst.Write that is past the end of the
         * last page that's been initialized by AdvanceXLInsertBuffer.
         */
        XLogRecPtr  EndPtr = XLogCtl->xlblocks[curridx];

        if (LogwrtResult.Write >= EndPtr)
            elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
                 (uint32) (LogwrtResult.Write >> 32),
                 (uint32) LogwrtResult.Write,
                 (uint32) (EndPtr >> 32), (uint32) EndPtr);

(どこまでログを書き込んだかを表す)LogwrtResult.Writeを現在のバッファページ(XLOG_BLCK)の終端まで進める。 書き込みリクエスト(WriteRqst.Write)が現在のバッファページの終端よりも小さい場合は、パーシャルページとなる。

        /* Advance LogwrtResult.Write to end of current buffer page */
        LogwrtResult.Write = EndPtr;
        ispartialpage = WriteRqst.Write < LogwrtResult.Write;

(ログをここまで書き込むつもりの)LogwrtResult.Write位置が現在開いているセグメント内かチェックする。 そうでなければログファイルセグメントをスイッチしなければならない。

開いていたファイルを閉じて、新しいセグメント番号を取得してファイルを初期化する

        if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
        {
            /*
             * Switch to new logfile segment.  We cannot have any pending
             * pages here (since we dump what we have at segment end).
             */
            Assert(npages == 0);
            if (openLogFile >= 0)
                          XLogFileClose();
            XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo);
            
            /* create/use new log file */
            use_existent = true;
            openLogFile = XLogFileInit(openLogSegNo, &use_existent, true);
            openLogOff = 0;
        }

ログファイルを開いてなければ開く。

        /* Make sure we have the current logfile open */
        if (openLogFile < 0)
        {
            XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo);
            openLogFile = XLogFileOpen(openLogSegNo);
            openLogOff = 0;
        }

初めのページであれば、startidx、startoffsetをセットして npageをインクリメントする。

        /* Add current page to the set of pending pages-to-dump */
        if (npages == 0)
        {
            /* first of group */
            startidx = curridx;
            startoffset = (LogwrtResult.Write - XLOG_BLCKSZ) % XLogSegSize;
        }
        npages++;

これが最後のループイタレーションか、キャッシュエリアの最後のページか(次のページはメモリ上で連続していないので)、ログファイルセグメントの終端ならば、ログの集合をダンプ(書き込み)する。

last_iterationフラグは、リクエストされた書き込み位置よりもログを書き込む位置が既に上回っている場合。 finishing_segフラグは、パーシャルページでないかつ書き込み開始位置+XLOG_BLCKSZ*npagesがセグメントサイズを超えていた場合。

        /*
         * Dump the set if this will be the last loop iteration, or if we are
         * at the last page of the cache area (since the next page won't be
         * contiguous in memory), or if we are at the end of the logfile
         * segment.
         */
        last_iteration = WriteRqst.Write <= LogwrtResult.Write;

        finishing_seg = !ispartialpage &&
            (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;

実際に書き込みを行うパート

startidx, startoffset, npages がループの上記内の操作で既に計算されている。

現在のオフセット位置がstartoffset位置でなければ(初回の書き込み?orリカバリ?)、lseek。

その後、キャッシュブロックから該当するnpageを書き込む。 ちゃんと全てのバイトが書き込まれるまでwrite(2)を繰り返したり、EINTRの場合はやり直すなどしている。

        if (last_iteration ||
            curridx == XLogCtl->XLogCacheBlck ||
            finishing_seg)
        {
            char       *from;
            Size        nbytes;
            Size        nleft;
            int         written;

            /* Need to seek in the file? */
            if (openLogOff != startoffset)
            {
                if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
                    ereport(PANIC,
                            (errcode_for_file_access(),
                     errmsg("could not seek in log file %s to offset %u: %m",
                            XLogFileNameP(ThisTimeLineID, openLogSegNo),
                            startoffset)));
                openLogOff = startoffset;
            }
            
            /* OK to write the page(s) */
            from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
            nbytes = npages * (Size) XLOG_BLCKSZ;
            nleft = nbytes;
            do
            {
                errno = 0;
                written = write(openLogFile, from, nleft);
                if (written <= 0)
                {
                    if (errno == EINTR)
                        continue;
                    ereport(PANIC,
                            (errcode_for_file_access(),
                             errmsg("could not write to log file %s "
                                    "at offset %u, length %zu: %m",
                                 XLogFileNameP(ThisTimeLineID, openLogSegNo),
                                    openLogOff, nbytes)));
                }
                nleft -= written;
                from += written;
            } while (nleft > 0);

ログのオフセットを書き込んだ分進める。 npages == 0 というのは、何かしらwriteしたということを示す条件になる

            /* Update state for write */
            openLogOff += nbytes;
            npages = 0;

ログファイルセグメントの最後のページを書いたら、すぐにそのセグメントをfsyncする。 これはfsyncリクエストを後で行う時に、戻って再度前のセグメントをopenすることを防ぐため。

これはまたArchiverにそのセグメントがアーカイバルストレージにコピーする準備ができたことを知らせる場所でもある。

            /*
             * If we just wrote the whole last page of a logfile segment,
             * fsync the segment immediately.  This avoids having to go back
             * and re-open prior segments when an fsync request comes along
             * later. Doing it here ensures that one and only one backend will
             * perform this fsync.
             *
             * This is also the right place to notify the Archiver that the
             * segment is ready to copy to archival storage, and to update the
             * timer for archive_timeout, and to signal for a checkpoint if
             * too many logfile segments have been used since the last
             * checkpoint.
             */
            if (finishing_seg)
            {
                issue_xlog_fsync(openLogFile, openLogSegNo);

                /* signal that we need to wakeup walsenders later */
                WalSndWakeupRequest();

                LogwrtResult.Flush = LogwrtResult.Write;        /* end of page */

                if (XLogArchivingActive())
                    XLogArchiveNotifySeg(openLogSegNo);

                XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);

                /*
                 * Request a checkpoint if we've consumed too much xlog since
                 * the last one.  For speed, we first check using the local
                 * copy of RedoRecPtr, which might be out of date; if it looks
                 * like a checkpoint is needed, forcibly update RedoRecPtr and
                 * recheck.
                 */
                if (IsUnderPostmaster && XLogCheckpointNeeded(openLogSegNo))
                {
                    (void) GetRedoRecPtr();
                    if (XLogCheckpointNeeded(openLogSegNo))
                        RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
                }
            }
        }                        

partialpageの場合、ログの書き込み結果はページ境界ではなく、書き込みリクエストの位置に更新する。 partialpageなので、ループのイテレーションはないので即break。

flexibleなら、何かしら書いたらきりが良いその時点でbreak。

        if (ispartialpage)
        {
            /* Only asked to write a partial page */
            LogwrtResult.Write = WriteRqst.Write;
            break;
        }
        curridx = NextBufIdx(curridx);

        /* If flexible, break out of loop as soon as we wrote something */
        if (flexible && npages == 0)
            break;
    }

フラッシュする必要があるなら、する。 書き込みリクエストの最低限フラッシュしなくてはいけないポイント(LogwrtResult.Flush)まで フラッシュしていたらここで更にフラッシュする必要はない。

openするファイルを持っていなかったり、間違ったものを持っている場合は、上記のループのイテレーションなしで、ここで手に入れることができる。 XLogFileClose()は、内部でopenLogFile = -1をしているので、すぐ次で新しいログセグメントファイルがopenされて、fsyncされる。 最後に書いた所まで結果をフラッシュする。

   /*
     * If asked to flush, do so
     */
    if (LogwrtResult.Flush < WriteRqst.Flush &&
        LogwrtResult.Flush < LogwrtResult.Write)

    {
        /*
         * Could get here without iterating above loop, in which case we might
         * have no open file or the wrong one.  However, we do not need to
         * fsync more than one file.
         */
        if (sync_method != SYNC_METHOD_OPEN &&
            sync_method != SYNC_METHOD_OPEN_DSYNC)
        {
            if (openLogFile >= 0 &&
                !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
                XLogFileClose();
            if (openLogFile < 0)
            {
                XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo);
                openLogFile = XLogFileOpen(openLogSegNo);
                openLogOff = 0;
            }

            issue_xlog_fsync(openLogFile, openLogSegNo);
        }

        /* signal that we need to wakeup walsenders later */
        WalSndWakeupRequest();

        LogwrtResult.Flush = LogwrtResult.Write;
    }

最後に共有メモリを更新する。 共有の'request'の値が、'result'の値に遅れを取らないことを確実にする。 これは本質的ではないが、数カ所のいくつかあるコードを救う。

    /*
     * Update shared-memory status
     *
     * We make sure that the shared 'request' values do not fall behind the
     * 'result' values.  This is not absolutely essential, but it saves some
     * code in a couple of places.
     */
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;

        SpinLockAcquire(&xlogctl->info_lck);
        xlogctl->LogwrtResult = LogwrtResult;
        if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
            xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
        if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
            xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
        SpinLockRelease(&xlogctl->info_lck);
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment