ログレコードをWALバッファに挿入(コピー)する
XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
レコードの末尾のXLOGポインタ(次のレコードの開始地点)。
- rmid: リソースマネージャの種類を表すID。rmidによってPostgreSQL内部のどのモジュールによってWALレコードが生成されたのが分かる。
- info: ログレコードの付帯情報。XLOG_XACT_COMMITやXLOG_XACT_ABORTなどがある。
- rdata: ログレコード本体。XLogRecDataのチェーンになっている
アラインメントを考慮している.
if (rechdr == NULL)
{
static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
MemSet(rechdr, 0, SizeOfXLogRecord);
}
リカバリ中は新しいWALのエントリを作れない.
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
Postgresは3つのProcessingModeがある。
- BootstrapProcessing
- InitProcessing
- NormalProcessing
BootstrapProcessingが使われるのは、initdbコマンドなどのDBの初期化時でWALは書かない。
/*
* In bootstrap mode, we don't actually log anything but XLOG resources;
* return a phony record pointer.
*/
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return EndPos;
}
beginにgotoする時は、recheckによって再度バックアップブロックを設定しなくてはならない場合である。
最後のチェックポイント以来、そのページへの最初の更新であれば、そのバッファはバックアップされなければならない。
その場合、そのページの内容の全体はXLOGレコードにアタッチされてXLOGはxl_info内のXLR_BKP_BLOCK(N)ビットをセットする。
このループでは、XLogRecDataのチェーンを辿っていき、 バックアップすべきバッファがあるかないかチェックしている。 バックアップすべきバッファはバックアップブロックと呼ばれる。
XLogRecDataのエントリが指し示すバッファが既にバックアップブロックに存在する場合、 同じバッファをバックアップブロックに入れる必要はない。 また、ここではバックアップブロックを除いたレコード長も計算している。
バックアップブロックを含めたレコード長は次のループで計算される。
/*
* Here we scan the rdata chain, to determine which buffers must be backed
* up.
*
* We may have to loop back to here if a race condition is detected below.
* We could prevent the race by doing all this work while holding an
* insertion lock, but it seems better to avoid doing CRC calculations
* while holding one.
*
* We add entries for backup blocks to the chain, so that they don't need
* any special treatment in the critical section where the chunks are
* copied into the WAL buffers. Those entries have to be unlinked from the
* chain if we have to loop back here.
*/
begin:;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
dtbuf[i] = InvalidBuffer;
dtbuf_bkp[i] = false;
}
/*
* Decide if we need to do full-page writes in this XLOG record: true if
* full_page_writes is on or we have a PITR request for it. Since we
* don't yet have an insertion lock, fullPageWrites and forcePageWrites
* could change under us, but we'll recheck them once we have a lock.
*/
doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
len = 0;
for (rdt = rdata;;)
{
if (rdt->buffer == InvalidBuffer)
{
/* Simple data, just include it */
len += rdt->len;
}
else
{
/* Find info for buffer */
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
if (rdt->buffer == dtbuf[i])
{
/* Buffer already referenced by earlier chain item */
if (dtbuf_bkp[i])
{
rdt->data = NULL;
rdt->len = 0;
}
else if (rdt->data)
len += rdt->len;
break;
}
if (dtbuf[i] == InvalidBuffer)
{
/* OK, put it in this slot */
dtbuf[i] = rdt->buffer;
if (doPageWrites && XLogCheckBuffer(rdt, true,
&(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
{
dtbuf_bkp[i] = true;
rdt->data = NULL;
rdt->len = 0;
}
else if (rdt->data)
len += rdt->len;
break;
}
}
if (i >= XLR_MAX_BKP_BLOCKS)
elog(PANIC, "can backup at most %d blocks per xlog record",
XLR_MAX_BKP_BLOCKS);
}
/* Break out of loop when rdt points to last chain item */
if (rdt->next == NULL)
break;
rdt = rdt->next;
}
XLOGレコードに取り付けるバックアップブロックデータとその長さを計算する。 ページバッファにholeがある場合はholeを抜かして計算する。 ポストグレスのページレイアウトについてはこちら↓ http://www.interdb.jp/pg/pgsql01.html#_1.4
/*
* Make additional rdata chain entries for the backup blocks, so that we
* don't need to special-case them in the write loop. This modifies the
* original rdata chain, but we keep a pointer to the last regular entry,
* rdt_lastnormal, so that we can undo this if we have to loop back to the
* beginning.
*
* At the exit of this loop, write_len includes the backup block data.
*
* Also set the appropriate info bits to show which buffers were backed
* up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
* value (ignoring InvalidBuffer) appearing in the rdata chain.
*/
rdt_lastnormal = rdt;
write_len = len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
BkpBlock *bkpb;
char *page;
if (!dtbuf_bkp[i])
continue;
info |= XLR_BKP_BLOCK(i);
bkpb = &(dtbuf_xlg[i]);
page = (char *) BufferGetBlock(dtbuf[i]);
rdt->next = &(dtbuf_rdt1[i]);
rdt = rdt->next;
rdt->data = (char *) bkpb;
rdt->len = sizeof(BkpBlock);
write_len += sizeof(BkpBlock);
rdt->next = &(dtbuf_rdt2[i]);
rdt = rdt->next;
if (bkpb->hole_length == 0) // holeがないなら、ページは満杯である
{
rdt->data = page;
rdt->len = BLCKSZ;
write_len += BLCKSZ;
rdt->next = NULL;
}
else
{
/* must skip the hole */
rdt->data = page;
rdt->len = bkpb->hole_offset;
write_len += bkpb->hole_offset;
rdt->next = &(dtbuf_rdt3[i]);
rdt = rdt->next;
rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
write_len += rdt->len;
rdt->next = NULL;
}
}
作成したrdt(XLogRecData)のチェーンからCRCを計算する。
/*
* Calculate CRC of the data, including all the backup blocks
*
* Note that the record header isn't added into the CRC initially since we
* don't know the prev-link yet. Thus, the CRC will represent the CRC of
* the whole record in the order: rdata, then backup blocks, then record
* header.
*/
INIT_CRC32(rdata_crc);
for (rdt = rdata; rdt != NULL; rdt = rdt->next)
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
/*
* Calculate CRC of the data, including all the backup blocks
*
* Note that the record header isn't added into the CRC initially since we
* don't know the prev-link yet. Thus, the CRC will represent the CRC of
* the whole record in the order: rdata, then backup blocks, then record
* header.
*/
INIT_CRC32(rdata_crc);
for (rdt = rdata; rdt != NULL; rdt = rdt->next)
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
/*
* Construct record header (prev-link is filled in later, after reserving
* the space for the record), and make that the first chunk in the chain.
*
* The CRC calculated for the header here doesn't include prev-link,
* because we don't know it yet. It will be added later.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
hdr_rdt.next = rdata;
hdr_rdt.data = (char *) rechdr;
hdr_rdt.len = SizeOfXLogRecord;
write_len += SizeOfXLogRecord;
ここからがロックを保持してログレコードを共有のWALバッファに挿入する本処理になる。 WALバッファへの挿入プロセスは以下の2-stepからなる
- WALから正確な量の領域を予約する。現在予約されたスペースの先頭はInsert->CurrBytePosで保持される、そしてこれはinsertpos_lckで保護される。
- レコードを予約したWALスペースへコピーする。これは予約したスペースを含む正しいWALバッファを見つけることを含み、そして、その場にレコードをコピーする。これは複数のプロセスで並列に動作可能である。
PostgreSQ9.4から、並列にレコードをWALバッファへコピーできるようになった。 それは上記の手順で先に領域だけ予約しておき、実際のコピーは並列に行うというものである。 その挿入がまだ進行中であるかどうかを追うために、挿入者はインサートロックを獲得する。 インサートロックは固定数で複数(少数)ある。
/*----------
*
* We have now done all the preparatory work we can without holding a
* lock or modifying shared state. From here on, inserting the new WAL
* record to the shared WAL buffer cache is a two-step process:
*
* 1. Reserve the right amount of space from the WAL. The current head of
* reserved space is kept in Insert->CurrBytePos, and is protected by
* insertpos_lck.
*
* 2. Copy the record to the reserved WAL space. This involves finding the
* correct WAL buffer containing the reserved space, and copying the
* record in place. This can be done concurrently in multiple processes.
*
* To keep track of which insertions are still in-progress, each concurrent
* inserter acquires an insertion lock. In addition to just indicating that
* an insertion is in progress, the lock tells others how far the inserter
* has progressed. There is a small fixed number of insertion locks,
* determined by NUM_XLOGINSERT_LOCKS. When an inserter crosses a page
* boundary, it updates the value stored in the lock to the how far it has
* inserted, to allow the previous buffer to be flushed.
*
* Holding onto an insertion lock also protects RedoRecPtr and
* fullPageWrites from changing until the insertion is finished.
*
* Step 2 can usually be done completely in parallel. If the required WAL
* page is not initialized yet, you have to grab WALBufMappingLock to
* initialize it, but the WAL writer tries to do that ahead of insertions
* to avoid that from happening in the critical path.
*
*----------
*/
START_CRIT_SECTIONはクリティカルセクションに入った数をカウントアップするだけ
isLogSwitchの場合、ログのflushが起こるのでExclusiveなInsertロックが取られる。 そうでない場合、ExclusiveではないInsertロックがとられる。Insertロックは複数あり、 並列にログの挿入ができる。(が、flushの際にはログの挿入が完了したことを確認する必要がある。)
START_CRIT_SECTION();
if (isLogSwitch)
WALInsertLockAcquireExclusive();
else
WALInsertLockAcquire();
チェックポイントの直後の場合、RedoRecPtrが古くなっている可能性がある。 その場合は、全てを再計算する必要がある。
full-pageの書き込みをしていないならば、XLOGレコードの内容に影響を与えないので、 ローカルコピーをアップデートするだけで、再計算は強制しない。
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to go
* back and recompute everything. This can only happen just after a
* checkpoint, so it's better to be slow in this case and fast otherwise.
*
* If we aren't doing full-page writes then RedoRecPtr doesn't actually
* affect the contents of the XLOG record, so we'll update our local copy
* but not force a recomputation.
*/
if (RedoRecPtr != Insert->RedoRecPtr)
{
Assert(RedoRecPtr < Insert->RedoRecPtr);
RedoRecPtr = Insert->RedoRecPtr;
if (doPageWrites)
{
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
if (dtbuf[i] == InvalidBuffer)
continue;
if (dtbuf_bkp[i] == false &&
dtbuf_lsn[i] <= RedoRecPtr)
{
/*
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
WALInsertLockRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
goto begin;
}
}
}
}
変更されている場合は再計算する。
/*
* Also check to see if fullPageWrites or forcePageWrites was just turned
* on; if we weren't already doing full-page writes then go back and
* recompute. (If it was just turned off, we could recompute the record
* without full pages, but we choose not to bother.)
*/
if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
{
/* Oops, must redo it with full-page data. */
WALInsertLockRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
goto begin;
}
並列に実行できない部分なので、できるだけ短くすべき。 領域の確保のタイミングでspin lockが使われている。
LogSwitchが起こる場合は、ReserveXLogSwitch() その他の単純な挿入でよい場合は、ReserverXLogInsertLocation() を実行する。
(P-WALモデルでは、ワーカーとWALバッファは1対1で紐付いており、 他のワーカーによる操作を気にしなくて良いため、ロックがいらないが、 PostgreSQLへの実装では、より多数のクライアント数に対応するため、 複数のワーカープロセスがあるWALバッファを同時に操作する可能性がある。 そのため、ReserverXLog...()のスピンロックは残す)
/*
* Reserve space for the record in the WAL. This also sets the xl_prev
* pointer.
*/
if (isLogSwitch)
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
ログを挿入する場合、取得したxl_prevを元に、最後のCRC計算を行う。 そうでない場合は、xlog-switchレコードだが、現在の挿入位置はすでにセグメントの始まりにあり、ここでは何もしない(後で行う)。
if (inserted)
{
/*
* Now that xl_prev has been filled in, finish CRC calculation of the
* record header.
*/
COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
FIN_CRC32(rdata_crc);
rechdr->xl_crc = rdata_crc;
/*
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
}
else
{
/*
* This was an xlog-switch record, but the current insert location was
* already exactly at the beginning of a segment, so there was no need
* to do anything.
*/
}
/*
* Done! Let others know that we're finished.
*/
WALInsertLockRelease();
MarkCurrentTransactionIdLoggedIfAny();
END_CRIT_SECTION();
/*
* Update shared LogwrtRqst.Write, if we crossed page boundary.
*/
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
/* advance global request to include new block(s) */
if (xlogctl->LogwrtRqst.Write < EndPos)
xlogctl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
}
これがXLOG_SWITCHレコードであれば、レコードとセグメントの残りを埋めた空のパディングスペースをフラッシュして、 セグメントの最後のアクションを実行する(例えば、アーカイバーに通知する)
セグメント(XLOG file)の先頭(あるいはXLOG BLOCKの先頭?)には、XLogPageHeaderData or XLogLongPageHeaderDataが存在する。 基本的なフィールドはXLogPageHeaderDataに含まれている。
/*
* If this was an XLOG_SWITCH record, flush the record and the empty
* padding space that fills the rest of the segment, and perform
* end-of-segment actions (eg, notifying archiver).
*/
if (isLogSwitch)
{
TRACE_POSTGRESQL_XLOG_SWITCH();
XLogFlush(EndPos);
/*
* Even though we reserved the rest of the segment for us, which is
* reflected in EndPos, we return a pointer to just the end of the
* xlog-switch record.
*/
if (inserted)
{
EndPos = StartPos + SizeOfXLogRecord;
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
EndPos += SizeOfXLogLongPHD;
else
EndPos += SizeOfXLogShortPHD;
}
}
}
参考: XLogPageHeaderData
typedef struct XLogPageHeaderData
{
uint16 xlp_magic; /* magic value for correctness checks */
uint16 xlp_info; /* flag bits, see below */
TimeLineID xlp_tli; /* TimeLineID of first record on page */
XLogRecPtr xlp_pageaddr; /* XLOG address of this page */ XLOG addressとは?ファイル中のそのページのオフセット?
/*
* When there is not enough space on current page for whole record, we
* continue on the next page. xlp_rem_len is the number of bytes
* remaining from a previous page.
*
* Note that xl_rem_len includes backup-block data; that is, it tracks
* xl_tot_len not xl_len in the initial header. Also note that the
* continuation data isn't necessarily aligned.
*/
現在のページに十分なスペースがない時に次のページに続ける。
xlp_rem_lenは前のページ(?)からの残りのバイト数。
uint32 xlp_rem_len; /* total len of remaining data for record */
} XLogPageHeaderData;
返り値はレコードの最後のbytes+1(EndPos)である。
ProcLastRecPtr はレコードを挿入する度に更新する、最後のレコードの先頭を指すポインタ XactLastRecEnd はトップレベル(?)のトランザクションが終わったと時、又は、ラストレコードの時、 または新しいトランザクションが開始するときにリセットされる。 すなわちログを書いたかどうか確かめるためのものである。
/*
* Update our global variables
*/
ProcLastRecPtr = StartPos;
XactLastRecEnd = EndPos;
return EndPos;