Skip to content

Instantly share code, notes, and snippets.

@chanfung032
Created May 8, 2018 12:16
percolator伪代码
class Transaction {
struct Write { Row row; Column col; string value; };
vector<Write> writes_;
int start_ts_;
Transaction() : start_ts_(oracle.GetTimestamp()) {}
void Set(Write w) { writes_.push back(w); }
bool Get(Row row, Column c, string* value) {
while (true) {
bigtable::Txn T = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
if (T.Read(row, c+"lock", [0, start_ts_])) {
// There is a pending lock; try to clean it and wait
BackoffAndMaybeCleanupLock(row, c);
continue;
}
// Find the latest write below our start timestamp.
latest write = T.Read(row, c+"write", [0, start_ts_]);
if (!latest write.found()) return false; // no data
int data ts = latest write.start timestamp();
*value = T.Read(row, c+"data", [data ts, data ts]);
return true;
}
}
// Prewrite tries to lock cell w, returning false in case of conflict.
bool Prewrite(Write w, Write primary) {
Column c = w.col;
bigtable::Txn T = bigtable::StartRowTransaction(w.row);
// Abort on writes_after our start timestamp ...
if (T.Read(w.row, c+"write", [start_ts_, ∞])) return false;
// ... or locks at any timestamp.
if (T.Read(w.row, c+"lock", [0, ∞])) return false;
T.Write(w.row, c+"data", start_ts_, w.value);
T.Write(w.row, c+"lock", start_ts_,
{primary.row, primary.col});
return T.Commit();
}
bool Commit() {
// The primary’s location.
Write primary = writes_[0];
vector<Write> secondaries(writes_.begin()+1, writes_.end());
if (!Prewrite(primary, primary)) return false;
for (Write w : secondaries)
if (!Prewrite(w, primary)) return false;
int commit_ts = oracle .GetTimestamp();
// Commit primary first.
Write p = primary;
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
return false; // aborted while working
T.Write(p.row, p.col+"write", commit_ts,
start_ts_); // Pointer to data written at start_ts_.
T.Erase(p.row, p.col+"lock", commit_ts);
if (!T.Commit()) return false; // commit point
// Second phase: write out write records for secondary cells.
for (Write w : secondaries) {
bigtable::Write(w.row, w.col+"write", commit_ts, start_ts_);
bigtable::Erase(w.row, w.col+"lock", commit_ts);
}
return true;
}
} // class Transaction
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment