Skip to content

Instantly share code, notes, and snippets.

@DeathPoem
Created August 18, 2022 04:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DeathPoem/39dec1aefac230f5f4b46efe90d4edb0 to your computer and use it in GitHub Desktop.
Save DeathPoem/39dec1aefac230f5f4b46efe90d4edb0 to your computer and use it in GitHub Desktop.
// Example program
#include <iostream>
#include <string>
#include <unordered_map>
#include <set>
#include <unordered_set>
#include <vector>
#include <queue>
#include <cassert>
#include <thread>
#include <mutex>
#include <chrono>
#include <condition_variable>
using namespace std;
using opcode = int;
const opcode emptyv = 0;
const opcode read = 1;
const opcode write = 2;
struct prepareArg {
string txid;
int index;
opcode op;
int writeval;
};
struct prepareRet {
bool ok;
int readval;
};
struct commitArg {
string txid;
opcode op;
int index;
};
struct commitRet {
bool ok;
};
struct abortArg {
string txid;
opcode op;
int index;
};
struct abortRet {
bool ok;
};
const int N = 100000;
const int lockGranularity = 100;
class Manager {
public:
int Stable[N]; // value thing
int Wtable[N]; // update thing
string exclusiveTable[N/lockGranularity]; // index is owned by txid
unordered_set<string> sharedTable[N/lockGranularity]; // index is shared by txid
mutex muttable[N/lockGranularity];
Manager() {
for (int i = 0; i < N; i++) {
Stable[i] = i;
}
}
bool releaselock(int index, opcode op, string txid) {
assert(txid!="");
assert(op!=emptyv);
// cout << "[Manager] " << ",releaselock ," << index << "," << exclusiveTable[index] << "," << op << endl;
if (op == write) {
exclusiveTable[index] = "";
} else {
sharedTable[index].erase(txid);
}
return true;
}
bool acquirelock(int index, opcode op, string txid) {
assert(txid!="");
assert(op!=emptyv);
// cout << "[Manager] " << ",acquirelock ," << index << "," << txid << endl;
if (exclusiveTable[index] != "" && exclusiveTable[index] != txid) {
cout << "[Manager] " << ",acquirelock fail," << index << "," << exclusiveTable[index] << endl;
return false;
}
if (op == write) {
exclusiveTable[index] = txid;
} else {
sharedTable[index].insert(txid);
}
return true;
}
void prepare(prepareArg arg, prepareRet& ret) {
int mutidx = arg.index / lockGranularity;
lock_guard<mutex> lk(muttable[mutidx]);
//
ret.ok = acquirelock(mutidx, arg.op, arg.txid);
if (!ret.ok) {
return;
}
if (arg.op == write) {
Wtable[arg.index] = arg.writeval;
// cout << "in prepare" << arg.index << arg.writeval << endl;
return;
} else {
ret.readval = Stable[arg.index];
return;
}
}
void abort(abortArg arg, abortRet& ret) {
int mutidx = arg.index / lockGranularity;
lock_guard<mutex> lk(muttable[mutidx]);
//
ret.ok = releaselock(mutidx, arg.op, arg.txid);
if (!ret.ok) {
// TBD won't happen?
return;
}
return;
}
void commit(commitArg arg, commitRet& ret) {
int mutidx = arg.index / lockGranularity;
lock_guard<mutex> lk(muttable[mutidx]);
//
ret.ok = releaselock(mutidx, arg.op, arg.txid);
if (!ret.ok) {
// TBD won't happen ?
return;
}
if (arg.op == write) {
Stable[arg.index] = Wtable[arg.index];
// cout << "in commit" << arg.index << Wtable[arg.index] << endl;
}
return;
}
};
class Worker {
public:
static bool doitonce(int i, int j, string txid, Manager& mgr) {
// prepare
cout << "[worker] prepare txid=" << txid << endl;
prepareArg arg{
txid: txid,
};
prepareRet ret;
int k = 0;
int sum = 0;
for (; k < 2; k++) {
arg.index = (i+k) % N;
arg.op = read;
mgr.prepare(arg, ret);
if (!ret.ok) {
// abort
abortArg argx{
txid: txid,
};
abortRet retx;
k--; // current not need abort
for (; k>=0; k-- ) {
argx.index = (i+k) % N;
argx.op = read;
mgr.abort(argx, retx);
}
return false;
}
sum += ret.readval;
}
arg.index = j;
arg.op = write;
arg.txid = txid;
arg.writeval = sum;
mgr.prepare(arg, ret);
if (!ret.ok) {
// abort
abortArg argx{
txid: txid,
};
abortRet retx;
for (; k>=0; k-- ) {
argx.index = (i+k)%N;
argx.op = read;
mgr.abort(argx, retx);
}
return false;
}
// commit
cout << "[worker] commit txid=" << txid << endl;
commitArg argy{
txid: txid,
};
commitRet rety;
for (int k = 0; k < 3; k++) {
argy.index = (i+k)%N;
argy.op = read;
mgr.commit(argy, rety);
}
argy.index = j;
argy.op = write;
mgr.commit(argy, rety);
return true;
}
static void threadmain(Manager& mgr, int workernum, mutex& m, condition_variable& condi, bool& fireflag) {
unique_lock lk(m);
cout << "[worker] wait" << workernum << endl;
while (!fireflag) {
condi.wait(lk);
}
lk.unlock();
cout << "[worker] begin" << workernum << endl;
for (int turn = 0; turn < 10000; turn++) {
int i = rand() % N;
int j = rand() % N;
string txid = to_string(rand());
//cout << "[worker] turn=" << turn << " start " << "i,j,txid=" << i << "," << j << "," << txid << "," << endl;
while (!doitonce(i, j, txid, mgr)) {
// retry
// cout << "[worker] turn=" << turn << ";retry" << "; workernum=" << workernum << "; i,j,txid=" << i << "," << j << "," << txid << "," << endl;
auto sl = chrono::milliseconds(rand() % 1000 + 200);
this_thread::sleep_for(sl);
}
//cout << "[worker] turn=" << turn << ";doit goodend" << "; workernum=" << workernum << endl;
}
cout << "[worker] end" << workernum << endl;
return;
}
};
void firethem(mutex& m, condition_variable& condi, bool& fireflag) {
this_thread::sleep_for(200ms);
unique_lock<mutex> lk(m);
fireflag = true;
cout << "firethem" << endl;
condi.notify_all();
}
int mainBenchmark() {
Manager mgr;
mutex mut;
condition_variable condi;
bool fireflag = false;
int M = 5;
vector<thread> threads;
for (int i=0; i<=M; i++) {
threads.emplace_back(thread(Worker::threadmain, ref(mgr), i, ref(mut), ref(condi), ref(fireflag)));
}
thread t0(firethem, ref(mut), ref(condi), ref(fireflag));
t0.join();
for (int i=0; i <= M; i++ ) {
threads[i].join();
}
this_thread::sleep_for(2000ms);
return 0;
}
int mainTest01(int i, int j) {
Manager mgr;
prepareArg arg{
txid: "mocktxid",
op: read,
};
prepareRet ret;
int sum = 0;
for (int k = 0; k < 3; k++) {
arg.index = (i+k) % N;
mgr.prepare(arg, ret);
if (!ret.ok) {
return -1;
}
sum+= ret.readval;
}
arg.index = j;
arg.op = write;
arg.writeval = sum;
mgr.prepare(arg, ret);
if (!ret.ok) {
return -1;
}
commitArg argx{
txid: "mocktxid",
};
commitRet retx;
for (int k = 0; k < 3; k++) {
argx.index = (i+k)%N;
argx.op = read;
mgr.commit(argx, retx);
if (!retx.ok) {
return -1;
}
}
argx.index = j;
argx.op = write;
mgr.commit(argx, retx);
if (!retx.ok) {
return -1;
}
if (mgr.Stable[j] == (3 * i + 3)) {
cout << "good end" << mgr.Stable[10] << endl;
} else {
cout << "bad end" << mgr.Stable[10] << endl;
}
return 0;
}
/*
Test
给定一个长度为 N (N = 100,000) 的整数数组S,有 M (M >= 2) 个 workers 并发访问 S 并更新 S。每个 worker 重复 10,000 次如下操作:
1) 随机生成 i, j, 0<= i, j < 100,000。
2) 更新 S 使得 S(j) = S(i) + S(i+1) + S(i+2)。 如果 i + 1 或者 i + 2 越界,则用 (i+1) % N 或者 (i+2) % N。
提示,
(a) 请考虑并发保护,即读取 S(i), S(i+1), S(i+2) 和更新 S(j) 为原子操作。*参考 two-phase locking 算法: Two-phase locking - Wikipedia
(b) 注意锁的粒度。每个 worker 一次只读3个元素,写1个元素。共有 100,000 个元素。并发 workers 同时访问同一元素的概率很低。采用细粒度的锁,可以降低冲突,提高并发度。
(c) 注意读锁和写锁的区别。
(d) j 有可能落在 [i, i+2] 区间。
(e) 附加思考:会出现死锁吗?如何避免
*/
int main() {
int ret = 0;
// (a) 请考虑并发保护,即读取 S(i), S(i+1), S(i+2) 和更新 S(j) 为原子操作。
// Manager 实现了2pc 提交
// (b) 注意锁的粒度。每个 worker 一次只读3个元素,写1个元素。共有 100,000 个元素。并发 workers 同时访问同一元素的概率很低。采用细粒度的锁,可以降低冲突,提高并发度。
// 同步源语仅用于保护Manager并发访问, 每100个index分一个mutex,一个exclusive/shared lock
// (c) 注意读锁和写锁的区别。
// acquirelock 时分读写
// (d) j 有可能落在 [i, i+2] 区间。
// 如下测试了 j 在 [i, i+2] 区间 以及 i+2超过N
ret = mainTest01(9, 10);
cout << "main() mainTest01(9, 10); ret=" << ret << endl;
ret = mainTest01(N-1, 10);
cout << "main() mainTest01(N-1, 10); ret=" << ret << endl;
// (e) 附加思考:会出现死锁吗?如何避免
// 1. 目前的实现是,如果Worker没有异常,仅会出现冲突然后重试; 如果要考虑Worker异常,可以参考 percolater 的实现方式,分出primary row, 让后来者选择 moveforward 还是 rollback
// 2. acquiirelock 遇到冲突时有三种策略
// 2.1. Error: 报错给Tx,Tx重试;
// 2.2. Wound-wait: txid有序, 冲突时可以抢占弱txid的lock, 否则block wait
// 2.3. Wait-Die: txid有序, 冲突时block wait弱txid的lock,否则abort
// 3. 这里的acquirelock是使用'Error'的nonblock实现,然后冲突后重试的; 缺点是retry的代价很大, 好处是没有死锁也不用block等;
// 4. 假设并发访问量很大,transaction涉及的资源多,经常冲突,可以通过引入MVCC机制
ret = mainBenchmark();
cout << "main() mainBenchmark(); ret=" << ret << endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment