与えられた位置(record)までのXLOGデータがディスクにフラッシュされることを確実にする。
void
XLogRecPtr record
/*
* Pointer to a location in the XLOG. These pointers are 64 bits wide,
* because we don't want them ever to overflow.
*/
typedef uint64 XLogRecPtr;
XLogRecPtrはXLOGの位置を指す。 これはおそらくWALファイル上の位置(バッファ上の位置ではなく)を表すと思われる。
REDOの間、書き込んでないWALを読んでいる。
/*
* Ensure that all XLOG data through the given position is flushed to disk.
*
* NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
* already held, and we try to avoid acquiring it if possible.
*/
void
XLogFlush(XLogRecPtr record)
{
XLogRecPtr WriteRqstPtr;
XLogwrtRqst WriteRqst;
/*
* During REDO, we are reading not writing WAL. Therefore, instead of
* trying to flush the WAL, we should update minRecoveryPoint instead. We
* test XLogInsertAllowed(), not InRecovery, because we need checkpointer
* to act this way too, and because when it tries to write the
* end-of-recovery checkpoint, it should indeed flush.
*/
if (!XLogInsertAllowed())
{
UpdateMinRecoveryPoint(record, false);
return;
}
変数LogwrtResultはプライベートコピー。 古くなっている可能性があるのでまた後で最新のデータをチェックする必要がある。
/*
* Private, possibly out-of-date copy of shared LogwrtResult.
* See discussion above.
*/
static XLogwrtResult LogwrtResult = {0, 0};
/* Quick exit if already known flushed */
if (record <= LogwrtResult.Flush)
return;
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
(uint32) (record >> 32), (uint32) record,
(uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
(uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
#endif
START_CRIT_SECTION()自体はクリティカルセクションに入っている数をカウントアップするだけで、ロックのたぐいは取っていない。 最終的にLogwrtResult.Flushは可能な限り大きくなるように、writeとfsyncを行う。
ローカルなWriteRqstPtrを初めは引数のrecord位置にセットする。
START_CRIT_SECTION();
/*
* Since fsync is usually a horribly expensive operation, we try to
* piggyback as much data as we can on each fsync: if we see any more data
* entered into the xlog buffer, we'll write and fsync that too, so that
* the final value of LogwrtResult.Flush is as large as possible. This
* gives us some chance of avoiding another fsync immediately after.
*/
/* initialize to given target; may increase below */
WriteRqstPtr = record;
ローカルなxlogctlは、volatile修飾子をつけて必ずメモリを見に行くようにする。 XLogCtlData* を触る際は、かならず構造体のinfo_lckのロックを取る。 リクエスト用のwriteポインタが更新されていれば、ローカルなWriteRqstPtrも更新する 既にrecord(必要最低分の位置)まで、Flush済みであれば何もせずにbreakして終了する.
/*
* Now wait until we get the write lock, or someone else does the flush
* for us.
*/
for (;;)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr insertpos;
/* read LogwrtResult and update local state */
SpinLockAcquire(&xlogctl->info_lck);
if (WriteRqstPtr < xlogctl->LogwrtRqst.Write)
WriteRqstPtr = xlogctl->LogwrtRqst.Write;
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
/* done already? */
if (record <= LogwrtResult.Flush)
break;
実際に書き込む前に、私たちが今まさにに書こうとしているpages(XLogCtl->pages)への全ての挿入を待つ
/*
* Before actually performing the write, wait for all in-flight
* insertions to the pages we're about to write to finish.
*/
insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
共有メモリのLWLockをさすマクロになっている
#define WALWriteLock (&MainLWLockArray[8].lock)
WALWriteLockがすぐに取れない時(他が取っていた時)、 LWLockAcquireOrWait()はロックが開放されるまで待ってfalseを返す。 注意すべきはこの時ロックは取得されていないということである。 このセマンティクスは他のバックエンドがログをフラッシュしている可能性があるので、 再度調べ直すためにcontinueでfor文の先頭からやり直すことを意味している。
/*
* Try to get the write lock. If we can't get it immediately, wait
* until it's released, and recheck if we still need to do the flush
* or if the backend that held the lock did it for us already. This
* helps to maintain a good rate of group committing when the system
* is bottlenecked by the speed of fsyncing.
*/
if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))
{
/*
* The lock is now free, but we didn't acquire it yet. Before we
* do, loop back to check if someone else flushed the record for
* us already.
*/
continue;
}
すでにrecordの位置以上までFlushされていたら、breakして終了する。
/* Got the lock; recheck whether request is satisfied */
LogwrtResult = XLogCtl->LogwrtResult;
if (record <= LogwrtResult.Flush)
{
LWLockRelease(WALWriteLock);
break;
}
CommitDelayが設定されていて、(enableFsyncがtrueで)、アクティブなトランザザクションがCommitSiblings以上ある場合、 sleepする。
/*
* Sleep before flush! By adding a delay here, we may give further
* backends the opportunity to join the backlog of group commit
* followers; this can significantly improve transaction throughput,
* at the risk of increasing transaction latency.
*
* We do not sleep if enableFsync is not turned on, nor if there are
* fewer than CommitSiblings other backends with active transactions.
*/
if (CommitDelay > 0 && enableFsync &&
MinimumActiveBackends(CommitSiblings))
{
pg_usleep(CommitDelay);
/*
* Re-check how far we can now flush the WAL. It's generally not
* safe to call WaitXLogInsertionsToFinish while holding
* WALWriteLock, because an in-progress insertion might need to
* also grab WALWriteLock to make progress. But we know that all
* the insertions up to insertpos have already finished, because
* that's what the earlier WaitXLogInsertionsToFinish() returned.
* We're only calling it again to allow insertpos to be moved
* further forward, not to actually wait for anyone.
*/
insertpos = WaitXLogInsertionsToFinish(insertpos);
}
最後にWALWriteLockを開放して終了
/* try to write/flush later additions to XLOG as well */
WriteRqst.Write = insertpos;
WriteRqst.Flush = insertpos;
XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
/* done */
break;
}
END_CRIT_SECTION();