Skip to content

Instantly share code, notes, and snippets.

@sotex
Last active January 11, 2024 09:36
Show Gist options
  • Save sotex/97e171e153c79d09553b7717e3637fd5 to your computer and use it in GitHub Desktop.
Save sotex/97e171e153c79d09553b7717e3637fd5 to your computer and use it in GitHub Desktop.
singleflight的C++实现版本
// solym
// ymwh@foxmail.com
// 2020年3月16日 20点28分
#include <map>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename _Kty, typename _Ty>
class SingleFlight
{
// 保存实际执行结果
struct _Result
{
bool _done; // 条件量
// 这里也可以使用读写锁来实现,会简单一点
// 第一个操作的线程加写锁,后续的线程加读锁,写完成之后,读锁不再阻塞,即可获取结果
std::mutex _mtx; // 条件变量互斥锁
std::condition_variable _cv; // 条件变量,用于通知
// _result 用于保存唯一那个真的执行处理的结果
// 这里需要考虑 Do 参数 func 函数的实际输入输出参数
// 不一定是返回值
_Ty _result;
};
// 实际执行的结果保存
std::mutex _domtx;
std::map<_Kty, std::shared_ptr<_Result>> _do;
public:
// 执行操作
// key 用于区分请求
// func 实际执行操作的函数
// args 实际执行操作函数的参数
template<class F, typename... Args>
_Ty Do(_Kty key, F&& func, Args&&... args)
{
// 先加写锁,后面可能要修改
std::unique_lock<std::mutex> lock(_domtx);
// 判断是否已经存在执行结果
auto iter = _do.find(key);
// 存在就等待完成
if (iter != _do.end())
{
// 获取实际执行结果结构
std::shared_ptr<_Result> pRes = iter->second;
lock.unlock();
// 等待条件成立(也就是实际执行的那个线程执行完成)
std::unique_lock<std::mutex> lck(pRes->_mtx);
pRes->_cv.wait(lck, [pRes]() -> bool { return pRes->_done; });
// 获取执行结果进行返回
return pRes->_result;
}
// 不存在就创建一个操作结果
std::shared_ptr<_Result> pRes = std::make_shared<_Result>();
pRes->_done = false; // 设置初始条件为 false
_do[key] = pRes;
lock.unlock(); // 解锁,别的线程能够继续
// 执行真正的操作,获取返回结果
pRes->_result = func(args...);
{
std::lock_guard<std::mutex> lck(pRes->_mtx);
pRes->_done = true;
}
// 通知所有线程
pRes->_cv.notify_all();
// 移除(也可以放在一个延迟移除队列,进行异步移除,以便后续的
// 相同key操作也可以直接使用。但在这外面缓存结果会更好)
lock.lock();
_do.erase(key);
return pRes->_result;
}
};
// solym
// ymwh@foxmail.com
// 2020年3月16日 20点28分
#include "singleflight.hpp"
#include <string>
#include <thread>
#include <vector>
#include <chrono>
#include <ctime>
#include <iomanip>
#include <iostream>
#include <sstream>
// 获取时间戳
std::string timestamp()
{
std::time_t now = std::time(nullptr);
std::ostringstream oss;
oss << std::put_time(std::localtime(&now), "%Y-%m-%d %H.%M.%S ");
return oss.str();
}
// 获取当前线程ID
std::string threadid()
{
std::thread::id this_id = std::this_thread::get_id();
std::ostringstream oss;
oss << this_id;
return oss.str();
}
// 输出简单日志
void outlog(const char* msg)
{
printf("%s [%8s] %s\n",timestamp().c_str(),threadid().c_str(), msg);
}
// 一个简单的测试函数(假设是比较耗时的绘图操作)
void draw(int w, int h, std::vector<uint8_t>& out)
{
outlog(" draw begin ");
// 休眠
std::this_thread::sleep_for(std::chrono::seconds(3));
for (size_t j = 0; j < 10; ++j)
{
out[j] = 'a' + j;
}
outlog(" draw ended");
}
int main()
{
// 创建一个 singlefilght 对象
// 这里用于接收返回值的是一个 shared_ptr 避免多次拷贝返回结果
SingleFlight<std::string, std::shared_ptr<std::vector<uint8_t> > > sl;
// 创建十个线程
for (size_t i = 0; i < 10; ++i)
{
std::thread thr([&sl]() {
outlog(" Begin");
// 对 draw 操作进行包装
// 便于输出需要的结果形式
std::shared_ptr<std::vector<uint8_t> > out = sl.Do(
"100", [](int h, int w) -> std::shared_ptr<std::vector<uint8_t> > {
std::vector<uint8_t> out(100,0);
draw(h, w, out);
return std::make_shared<std::vector<uint8_t> >(out);
},
100,
100);
char buffer[256];
sprintf(buffer," %p Out: %s",out->data(),(char*)out->data());
outlog(buffer);
std::cout << std::endl;
outlog(" Ended: ");
});
thr.detach(); // 线程分离执行
}
outlog(" Main Begin ");
// 休眠等待所有线程正常返回
std::this_thread::sleep_for(std::chrono::seconds(5));
outlog(" Main Ended ");
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment