Skip to content

Instantly share code, notes, and snippets.

@niujiabenben
Created August 20, 2019 14:35
Show Gist options
  • Save niujiabenben/3136338fb40ba2ecc1f7ef20c5bbbad5 to your computer and use it in GitHub Desktop.
Save niujiabenben/3136338fb40ba2ecc1f7ef20c5bbbad5 to your computer and use it in GitHub Desktop.
libcurl multi inferface example
#include "httpclient.h"
static inline size_t WriteCallback(void* contents, size_t size, size_t nmemb,
void* userp) {
((std::string*) userp)->append((char*) contents, size * nmemb);
return size * nmemb;
}
//////////////////////////// class HttpClient //////////////////////////////////
HttpClient::HttpClient() {
curl_global_init(CURL_GLOBAL_ALL);
handle_ = curl_multi_init();
CHECK(handle_ != NULL) << "Failed to create curl_multi handle.";
thread_ = std::thread([this] { Run(); });
}
HttpClient::~HttpClient() {
stop_ = true;
condition_.notify_all();
if (thread_.joinable()) { thread_.join(); }
//// 清理各种handle的顺序, 请参考:
//// https://curl.haxx.se/libcurl/c/curl_multi_cleanup.html
for (auto& request : running_requests_) {
if (request->curl != NULL) {
curl_multi_remove_handle(handle_, request->curl);
curl_easy_cleanup(request->curl);
request->curl = NULL;
}
}
for (auto& kv : curl_pool_) {
while (!kv.second.empty()) {
curl_easy_cleanup(kv.second.top());
kv.second.pop();
}
}
curl_multi_cleanup(handle_);
}
void HttpClient::AddRequest(HttpRequestPtr request) {
if (true) {
std::lock_guard<std::mutex> lock(mutex_new_);
new_requests_.push(std::move(request));
}
condition_.notify_all();
}
HttpRequestPtr HttpClient::GetRequest(const std::string& name) {
std::lock_guard<std::mutex> lock(mutex_ready_);
auto it = ready_requests_.begin();
for (; it != ready_requests_.end(); ++it) {
if ((*it)->name == name) {
auto copy = *it;
ready_requests_.erase(it);
return copy;
}
}
return HttpRequestPtr();
}
void HttpClient::Run() {
const int waittime = 1000; // in millisecond
int still_running = 0;
int msgs_left = 0;
while (!stop_) {
//// wait util we have new requests
if (running_requests_.empty()) {
std::unique_lock<std::mutex> lock(mutex_new_);
condition_.wait(lock, [this] {
return stop_ || (!new_requests_.empty());
});
if (stop_) { break; }
}
//// add all of new requests into running_requests
while (true) {
HttpRequestPtr request;
if (true) {
std::lock_guard<std::mutex> lock(mutex_new_);
if (new_requests_.empty()) { break; }
request = std::move(new_requests_.front());
new_requests_.pop();
}
AddRequestToHandle(std::move(request));
}
//// perform & wait & read
curl_multi_perform(handle_, &still_running);
curl_multi_wait(handle_, NULL, 0, waittime, NULL);
CURLMsg* msg = NULL;
do {
msg = curl_multi_info_read(handle_, &msgs_left);
if ((msg != NULL) && (msg->msg == CURLMSG_DONE)) {
RemoveRequestFromHandle(msg->easy_handle);
}
} while (msg != NULL);
}
}
void HttpClient::AddRequestToHandle(HttpRequestPtr request) {
CHECK_LT(request->retries, request->max_retries);
request->http_status = -1;
request->response.clear();
if (request->curl != NULL) {
curl_easy_cleanup(request->curl);
request->curl = NULL;
}
//// 优先从pool中获取
auto iter = curl_pool_.find(request->url);
if (iter != curl_pool_.end()) {
if (!iter->second.empty()) {
request->curl = iter->second.top();
iter->second.pop();
}
}
//// 若pool中没有, 则自己创建一个
if (request->curl == NULL) {
request->curl = curl_easy_init();
CHECK(request->curl != NULL) << "Failed to initialize curl";
}
curl_slist* slist = NULL;
if (request->type == HttpRequest::Type::PROTOBUF) {
slist = curl_slist_append(slist, "application/x-protobuf");
slist = curl_slist_append(slist, "Content-Type:application/x-protobuf");
slist = curl_slist_append(slist, "Accept:application/x-protobuf");
} else if (request->type == HttpRequest::Type::JSON) {
slist = curl_slist_append(slist, "application/json");
slist = curl_slist_append(slist, "Content-Type:application/json");
slist = curl_slist_append(slist, "Accept:application/json");
}
curl_easy_setopt(request->curl, CURLOPT_HTTPHEADER, slist);
curl_easy_setopt(request->curl, CURLOPT_POST, 1L);
curl_easy_setopt(request->curl, CURLOPT_HEADER, 0L);
curl_easy_setopt(request->curl, CURLOPT_URL, request->url.c_str());
curl_easy_setopt(request->curl, CURLOPT_POSTFIELDS, request->data.c_str());
curl_easy_setopt(request->curl, CURLOPT_POSTFIELDSIZE, request->data.size());
curl_easy_setopt(request->curl, CURLOPT_TIMEOUT, request->timeout);
curl_easy_setopt(request->curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(request->curl, CURLOPT_WRITEDATA, &request->response);
curl_multi_add_handle(handle_, request->curl);
running_requests_.push_back(std::move(request));
request->retries += 1;
}
void HttpClient::RemoveRequestFromHandle(CURL* curl) {
int http_status = 0;
curl_multi_remove_handle(handle_, curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, http_status);
HttpRequestPtr request;
auto it = running_requests_.begin();
for (; it != running_requests_.end(); ++it) {
if ((*it)->curl == curl) {
request = std::move(*it);
running_requests_.erase(it);
break;
}
}
request->http_status = http_status;
if (http_status != 200) {
request->response.clear();
curl_easy_cleanup(curl);
request->curl = NULL;
if (request->retries < request->max_retries) {
AddRequestToHandle(std::move(request));
} else {
std::lock_guard<std::mutex> lock(mutex_ready_);
ready_requests_.push_back(std::move(request));
}
} else {
//// 这里参考: https://stackoverflow.com/questions/14911156/how-to-properly-reuse-a-curl-handle
curl_easy_reset(curl);
request->curl = NULL;
if (curl_pool_.find(request->url) == curl_pool_.end()) {
curl_pool_[request->url] = std::stack<CURL*>();
}
curl_pool_[request->url].push(curl);
std::lock_guard<std::mutex> lock(mutex_ready_);
ready_requests_.push_back(std::move(request));
}
}
#ifndef UNITTEST_HTTPCLIENT_H_
#define UNITTEST_HTTPCLIENT_H_
#include <curl/curl.h>
#include "common.h"
struct HttpRequest;
class HttpClient;
using HttpRequestPtr = std::shared_ptr<HttpRequest>;
using HttpClientPtr = std::shared_ptr<HttpClient>;
struct HttpRequest {
enum class Type { PROTOBUF, JSON };
std::string name;
std::string url;
std::string data;
Type type;
int timeout;
int max_retries;
int retries{0};
int http_status{-1};
std::string response;
CURL* curl{NULL};
bool good() const { return http_status == 200; }
};
class HttpClient {
public:
HttpClient();
DISABLE_COPY_ASIGN(HttpClient);
DISABLE_MOVE_ASIGN(HttpClient);
~HttpClient();
//// 每一个request需要设置如下变量:
//// name, url, data, type, timeout, max_retries
//// 且调用者需要保证name唯一.
void AddRequest(HttpRequestPtr request);
//// 由名字查找已经ready的request. 如果没有ready, 则返回空指针.
//// 如果request->good()为true, 则request->response为请求结果,
//// 否则request->response为空.
//// 返回的request中, 用户设置的参数不变, request->curl恒为空.
HttpRequestPtr GetRequest(const std::string& name);
private:
void Run();
void AddRequestToHandle(HttpRequestPtr request);
void RemoveRequestFromHandle(CURL* curl);
std::queue<HttpRequestPtr> new_requests_;
std::list<HttpRequestPtr> running_requests_;
std::list<HttpRequestPtr> ready_requests_;
CURLM* handle_{NULL};
std::map<std::string, std::stack<CURL*>> curl_pool_;
std::thread thread_;
mutable std::mutex mutex_new_;
mutable std::mutex mutex_ready_;
mutable std::condition_variable condition_;
std::atomic<bool> stop_{false};
};
#endif // UNITTEST_HTTPCLIENT_H_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment