Created
August 18, 2022 04:23
-
-
Save DeathPoem/39dec1aefac230f5f4b46efe90d4edb0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example program | |
#include <iostream> | |
#include <string> | |
#include <unordered_map> | |
#include <set> | |
#include <unordered_set> | |
#include <vector> | |
#include <queue> | |
#include <cassert> | |
#include <thread> | |
#include <mutex> | |
#include <chrono> | |
#include <condition_variable> | |
using namespace std; | |
using opcode = int; | |
const opcode emptyv = 0; | |
const opcode read = 1; | |
const opcode write = 2; | |
struct prepareArg { | |
string txid; | |
int index; | |
opcode op; | |
int writeval; | |
}; | |
struct prepareRet { | |
bool ok; | |
int readval; | |
}; | |
struct commitArg { | |
string txid; | |
opcode op; | |
int index; | |
}; | |
struct commitRet { | |
bool ok; | |
}; | |
struct abortArg { | |
string txid; | |
opcode op; | |
int index; | |
}; | |
struct abortRet { | |
bool ok; | |
}; | |
const int N = 100000; | |
const int lockGranularity = 100; | |
class Manager { | |
public: | |
int Stable[N]; // value thing | |
int Wtable[N]; // update thing | |
string exclusiveTable[N/lockGranularity]; // index is owned by txid | |
unordered_set<string> sharedTable[N/lockGranularity]; // index is shared by txid | |
mutex muttable[N/lockGranularity]; | |
Manager() { | |
for (int i = 0; i < N; i++) { | |
Stable[i] = i; | |
} | |
} | |
bool releaselock(int index, opcode op, string txid) { | |
assert(txid!=""); | |
assert(op!=emptyv); | |
// cout << "[Manager] " << ",releaselock ," << index << "," << exclusiveTable[index] << "," << op << endl; | |
if (op == write) { | |
exclusiveTable[index] = ""; | |
} else { | |
sharedTable[index].erase(txid); | |
} | |
return true; | |
} | |
bool acquirelock(int index, opcode op, string txid) { | |
assert(txid!=""); | |
assert(op!=emptyv); | |
// cout << "[Manager] " << ",acquirelock ," << index << "," << txid << endl; | |
if (exclusiveTable[index] != "" && exclusiveTable[index] != txid) { | |
cout << "[Manager] " << ",acquirelock fail," << index << "," << exclusiveTable[index] << endl; | |
return false; | |
} | |
if (op == write) { | |
exclusiveTable[index] = txid; | |
} else { | |
sharedTable[index].insert(txid); | |
} | |
return true; | |
} | |
void prepare(prepareArg arg, prepareRet& ret) { | |
int mutidx = arg.index / lockGranularity; | |
lock_guard<mutex> lk(muttable[mutidx]); | |
// | |
ret.ok = acquirelock(mutidx, arg.op, arg.txid); | |
if (!ret.ok) { | |
return; | |
} | |
if (arg.op == write) { | |
Wtable[arg.index] = arg.writeval; | |
// cout << "in prepare" << arg.index << arg.writeval << endl; | |
return; | |
} else { | |
ret.readval = Stable[arg.index]; | |
return; | |
} | |
} | |
void abort(abortArg arg, abortRet& ret) { | |
int mutidx = arg.index / lockGranularity; | |
lock_guard<mutex> lk(muttable[mutidx]); | |
// | |
ret.ok = releaselock(mutidx, arg.op, arg.txid); | |
if (!ret.ok) { | |
// TBD won't happen? | |
return; | |
} | |
return; | |
} | |
void commit(commitArg arg, commitRet& ret) { | |
int mutidx = arg.index / lockGranularity; | |
lock_guard<mutex> lk(muttable[mutidx]); | |
// | |
ret.ok = releaselock(mutidx, arg.op, arg.txid); | |
if (!ret.ok) { | |
// TBD won't happen ? | |
return; | |
} | |
if (arg.op == write) { | |
Stable[arg.index] = Wtable[arg.index]; | |
// cout << "in commit" << arg.index << Wtable[arg.index] << endl; | |
} | |
return; | |
} | |
}; | |
class Worker { | |
public: | |
static bool doitonce(int i, int j, string txid, Manager& mgr) { | |
// prepare | |
cout << "[worker] prepare txid=" << txid << endl; | |
prepareArg arg{ | |
txid: txid, | |
}; | |
prepareRet ret; | |
int k = 0; | |
int sum = 0; | |
for (; k < 2; k++) { | |
arg.index = (i+k) % N; | |
arg.op = read; | |
mgr.prepare(arg, ret); | |
if (!ret.ok) { | |
// abort | |
abortArg argx{ | |
txid: txid, | |
}; | |
abortRet retx; | |
k--; // current not need abort | |
for (; k>=0; k-- ) { | |
argx.index = (i+k) % N; | |
argx.op = read; | |
mgr.abort(argx, retx); | |
} | |
return false; | |
} | |
sum += ret.readval; | |
} | |
arg.index = j; | |
arg.op = write; | |
arg.txid = txid; | |
arg.writeval = sum; | |
mgr.prepare(arg, ret); | |
if (!ret.ok) { | |
// abort | |
abortArg argx{ | |
txid: txid, | |
}; | |
abortRet retx; | |
for (; k>=0; k-- ) { | |
argx.index = (i+k)%N; | |
argx.op = read; | |
mgr.abort(argx, retx); | |
} | |
return false; | |
} | |
// commit | |
cout << "[worker] commit txid=" << txid << endl; | |
commitArg argy{ | |
txid: txid, | |
}; | |
commitRet rety; | |
for (int k = 0; k < 3; k++) { | |
argy.index = (i+k)%N; | |
argy.op = read; | |
mgr.commit(argy, rety); | |
} | |
argy.index = j; | |
argy.op = write; | |
mgr.commit(argy, rety); | |
return true; | |
} | |
static void threadmain(Manager& mgr, int workernum, mutex& m, condition_variable& condi, bool& fireflag) { | |
unique_lock lk(m); | |
cout << "[worker] wait" << workernum << endl; | |
while (!fireflag) { | |
condi.wait(lk); | |
} | |
lk.unlock(); | |
cout << "[worker] begin" << workernum << endl; | |
for (int turn = 0; turn < 10000; turn++) { | |
int i = rand() % N; | |
int j = rand() % N; | |
string txid = to_string(rand()); | |
//cout << "[worker] turn=" << turn << " start " << "i,j,txid=" << i << "," << j << "," << txid << "," << endl; | |
while (!doitonce(i, j, txid, mgr)) { | |
// retry | |
// cout << "[worker] turn=" << turn << ";retry" << "; workernum=" << workernum << "; i,j,txid=" << i << "," << j << "," << txid << "," << endl; | |
auto sl = chrono::milliseconds(rand() % 1000 + 200); | |
this_thread::sleep_for(sl); | |
} | |
//cout << "[worker] turn=" << turn << ";doit goodend" << "; workernum=" << workernum << endl; | |
} | |
cout << "[worker] end" << workernum << endl; | |
return; | |
} | |
}; | |
void firethem(mutex& m, condition_variable& condi, bool& fireflag) { | |
this_thread::sleep_for(200ms); | |
unique_lock<mutex> lk(m); | |
fireflag = true; | |
cout << "firethem" << endl; | |
condi.notify_all(); | |
} | |
int mainBenchmark() { | |
Manager mgr; | |
mutex mut; | |
condition_variable condi; | |
bool fireflag = false; | |
int M = 5; | |
vector<thread> threads; | |
for (int i=0; i<=M; i++) { | |
threads.emplace_back(thread(Worker::threadmain, ref(mgr), i, ref(mut), ref(condi), ref(fireflag))); | |
} | |
thread t0(firethem, ref(mut), ref(condi), ref(fireflag)); | |
t0.join(); | |
for (int i=0; i <= M; i++ ) { | |
threads[i].join(); | |
} | |
this_thread::sleep_for(2000ms); | |
return 0; | |
} | |
int mainTest01(int i, int j) { | |
Manager mgr; | |
prepareArg arg{ | |
txid: "mocktxid", | |
op: read, | |
}; | |
prepareRet ret; | |
int sum = 0; | |
for (int k = 0; k < 3; k++) { | |
arg.index = (i+k) % N; | |
mgr.prepare(arg, ret); | |
if (!ret.ok) { | |
return -1; | |
} | |
sum+= ret.readval; | |
} | |
arg.index = j; | |
arg.op = write; | |
arg.writeval = sum; | |
mgr.prepare(arg, ret); | |
if (!ret.ok) { | |
return -1; | |
} | |
commitArg argx{ | |
txid: "mocktxid", | |
}; | |
commitRet retx; | |
for (int k = 0; k < 3; k++) { | |
argx.index = (i+k)%N; | |
argx.op = read; | |
mgr.commit(argx, retx); | |
if (!retx.ok) { | |
return -1; | |
} | |
} | |
argx.index = j; | |
argx.op = write; | |
mgr.commit(argx, retx); | |
if (!retx.ok) { | |
return -1; | |
} | |
if (mgr.Stable[j] == (3 * i + 3)) { | |
cout << "good end" << mgr.Stable[10] << endl; | |
} else { | |
cout << "bad end" << mgr.Stable[10] << endl; | |
} | |
return 0; | |
} | |
/* | |
Test | |
给定一个长度为 N (N = 100,000) 的整数数组S,有 M (M >= 2) 个 workers 并发访问 S 并更新 S。每个 worker 重复 10,000 次如下操作: | |
1) 随机生成 i, j, 0<= i, j < 100,000。 | |
2) 更新 S 使得 S(j) = S(i) + S(i+1) + S(i+2)。 如果 i + 1 或者 i + 2 越界,则用 (i+1) % N 或者 (i+2) % N。 | |
提示, | |
(a) 请考虑并发保护,即读取 S(i), S(i+1), S(i+2) 和更新 S(j) 为原子操作。*参考 two-phase locking 算法: Two-phase locking - Wikipedia | |
(b) 注意锁的粒度。每个 worker 一次只读3个元素,写1个元素。共有 100,000 个元素。并发 workers 同时访问同一元素的概率很低。采用细粒度的锁,可以降低冲突,提高并发度。 | |
(c) 注意读锁和写锁的区别。 | |
(d) j 有可能落在 [i, i+2] 区间。 | |
(e) 附加思考:会出现死锁吗?如何避免 | |
*/ | |
int main() { | |
int ret = 0; | |
// (a) 请考虑并发保护,即读取 S(i), S(i+1), S(i+2) 和更新 S(j) 为原子操作。 | |
// Manager 实现了2pc 提交 | |
// (b) 注意锁的粒度。每个 worker 一次只读3个元素,写1个元素。共有 100,000 个元素。并发 workers 同时访问同一元素的概率很低。采用细粒度的锁,可以降低冲突,提高并发度。 | |
// 同步源语仅用于保护Manager并发访问, 每100个index分一个mutex,一个exclusive/shared lock | |
// (c) 注意读锁和写锁的区别。 | |
// acquirelock 时分读写 | |
// (d) j 有可能落在 [i, i+2] 区间。 | |
// 如下测试了 j 在 [i, i+2] 区间 以及 i+2超过N | |
ret = mainTest01(9, 10); | |
cout << "main() mainTest01(9, 10); ret=" << ret << endl; | |
ret = mainTest01(N-1, 10); | |
cout << "main() mainTest01(N-1, 10); ret=" << ret << endl; | |
// (e) 附加思考:会出现死锁吗?如何避免 | |
// 1. 目前的实现是,如果Worker没有异常,仅会出现冲突然后重试; 如果要考虑Worker异常,可以参考 percolater 的实现方式,分出primary row, 让后来者选择 moveforward 还是 rollback | |
// 2. acquiirelock 遇到冲突时有三种策略 | |
// 2.1. Error: 报错给Tx,Tx重试; | |
// 2.2. Wound-wait: txid有序, 冲突时可以抢占弱txid的lock, 否则block wait | |
// 2.3. Wait-Die: txid有序, 冲突时block wait弱txid的lock,否则abort | |
// 3. 这里的acquirelock是使用'Error'的nonblock实现,然后冲突后重试的; 缺点是retry的代价很大, 好处是没有死锁也不用block等; | |
// 4. 假设并发访问量很大,transaction涉及的资源多,经常冲突,可以通过引入MVCC机制 | |
ret = mainBenchmark(); | |
cout << "main() mainBenchmark(); ret=" << ret << endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment