Created
June 9, 2015 08:17
-
-
Save matejk/622aca94540a7c5d3f95 to your computer and use it in GitHub Desktop.
Poco::MongoDB connection pool and multithreading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <memory> | |
#include <thread> | |
#include <mutex> | |
#include <Poco/UUIDGenerator.h> | |
#include <Poco/MongoDB/PoolableConnectionFactory.h> | |
#include <Poco/MongoDB/Connection.h> | |
#include <Poco/MongoDB/Database.h> | |
#include <Poco/MongoDB/Cursor.h> | |
#include <Poco/MongoDB/Document.h> | |
#include <Poco/MongoDB/Array.h> | |
#include <Poco/MongoDB/Element.h> | |
using namespace Poco; | |
typedef PoolableObjectFactory<MongoDB::Connection, MongoDB::Connection::Ptr> MongoDBConnectionFactory; | |
typedef std::unique_ptr<MongoDBConnectionFactory> MongoDBConnectionFactoryPtr; | |
typedef ObjectPool<MongoDB::Connection, MongoDB::Connection::Ptr> MongoDBConnectionPool; | |
typedef std::unique_ptr<MongoDBConnectionPool> MongoDBConnectionPoolPtr; | |
// global variables | |
MongoDBConnectionFactoryPtr g_connectionFactory; | |
MongoDBConnectionPoolPtr g_connectionPool; | |
MongoDB::Database g_db("TestDB"); | |
MongoDB::PooledConnection takeConnection() | |
{ | |
static std::mutex connectionPoolLock; | |
std::lock_guard<std::mutex> l(connectionPoolLock); | |
MongoDB::PooledConnection pooledConnection(*g_connectionPool); | |
auto c = static_cast<MongoDB::Connection::Ptr>(pooledConnection); | |
if (!c) { | |
// Connection pool can return null if the pool is full | |
// TODO: Gracefully handle this here or implement ObjectPool::borrowObjectWithTimeout | |
} | |
return std::move(pooledConnection); | |
} | |
static Int64 extractInt64(const MongoDB::Document& d, const std::string& name) | |
{ | |
Int64 num = -1; | |
if (d.isType<Int32>(name)) { | |
num = d.get<Int32>(name); | |
} | |
else if (d.isType<Int64>(name)) { | |
num = d.get<Int64>(name); | |
} | |
else if (d.isType<double>(name)) { | |
num = static_cast<double>(d.get<double>(name)); | |
} | |
else { | |
throw Exception(name + " is not a number."); | |
} | |
return num; | |
} | |
static void verifyResponse(const MongoDB::Document& response, bool expectOK = true) | |
{ | |
// TODO: Remove when updated MongoDB::Document header is used. | |
auto& r = const_cast<MongoDB::Document&>(response); | |
/* | |
* http://docs.mongodb.org/manual/reference/command/insert/#insert-command-output | |
* http://docs.mongodb.org/manual/reference/command/update/#update-command-output | |
* http://docs.mongodb.org/manual/reference/command/delete/ | |
* http://docs.mongodb.org/manual/reference/command/findAndModify/ | |
*/ | |
std::ostringstream ostr; | |
try { | |
if (r.exists("ok")) { | |
const auto ok = extractInt64(r, "ok"); | |
if (ok != 1) { | |
ostr << "Command failed: ok = " << ok << ". "; | |
} | |
} | |
else if (expectOK) { | |
ostr << "UNEXPECTED: Missing 'ok' in response."; | |
} | |
else { | |
// Document that does not have embedded status response, e.g. from find cursor | |
return; | |
} | |
// Find, aggregate commands | |
Int64 code = -1; | |
if (r.exists("code")) { | |
code = extractInt64(r, "code"); | |
} | |
if (r.exists("errmsg")) { | |
ostr << (code >= 0 ? std::to_string(code) + ": " : "") << r.get<std::string>("errmsg"); | |
} | |
if (r.exists("$err")) { | |
ostr << (code >= 0 ? std::to_string(code) + ": " : "") << r.get<std::string>("$err"); | |
} | |
// insert, update, delete commands | |
if (r.exists("writeErrors")) { | |
const auto ev = r.get<MongoDB::Array::Ptr>("writeErrors"); | |
for (size_t i = 0; i < ev->size(); ++i) { | |
auto err = ev->get<MongoDB::Document::Ptr>(i); | |
const auto index = extractInt64(*err, "index"); | |
const auto code = extractInt64(*err, "code"); | |
ostr << index << ": " << code << ": " << err->get<std::string>("errmsg"); | |
if ( (i+1) < ev->size() ) { | |
ostr << std::endl; | |
} | |
} | |
} | |
if (r.exists("writeConcernError")) { | |
const auto err = r.get<MongoDB::Document::Ptr>("writeConcernError"); | |
if (ostr.tellp() > 0) { | |
ostr << std::endl; | |
} | |
const auto index = extractInt64(*err, "index"); | |
const auto code = extractInt64(*err, "code"); | |
ostr << index << ": " << code << ": " << err->get<std::string>("errmsg"); | |
} | |
} | |
catch (const Exception& e) { | |
std::cout << "Response error exception " << e.displayText() << std::endl; | |
throw; | |
} | |
if (ostr.tellp() > 0) { | |
std::cout << "Error response from server: " << response.toString(2) << std::endl; | |
throw Exception("MongoDB: " + ostr.str()); | |
} | |
} | |
void insert(const std::string id) | |
{ | |
try { | |
auto con = takeConnection(); | |
auto c = static_cast<MongoDB::Connection::Ptr>(con); | |
MongoDB::Document::Ptr document(new MongoDB::Document()); | |
document->add("_id", id); | |
MongoDB::Array::Ptr documents(new MongoDB::Array()); | |
documents->add(std::to_string(0), document); | |
auto insert = g_db.createCommand(); | |
insert->selector() | |
.add("insert", "Object") | |
.add("documents", documents); | |
std::cout << "INSERT : " << id << std::endl; | |
MongoDB::ResponseMessage response; | |
c->sendRequest(*insert, response); | |
auto doc = *(response.documents()[0]); | |
verifyResponse(doc); | |
for (auto i : response.documents()) { | |
std::cout << i->toString(2) << std::endl; | |
} | |
} | |
catch(const Exception& e) { | |
std::cerr << "INSERT " << id << " failed: " << e.displayText() << std::endl; | |
} | |
} | |
void query(const std::string id) | |
{ | |
try { | |
auto con = takeConnection(); | |
auto c = static_cast<MongoDB::Connection::Ptr>(con); | |
auto queryPtr = g_db.createQueryRequest("Object"); | |
queryPtr->selector().add("_id", id); | |
queryPtr->setNumberToReturn(1); | |
std::cout << "QUERY : " << id << std::endl; | |
MongoDB::ResponseMessage response; | |
c->sendRequest(*queryPtr, response); | |
auto doc = *(response.documents()[0]); | |
verifyResponse(doc, false); | |
for (auto i : response.documents()) { | |
std::cout << i->toString(2) << std::endl; | |
} | |
} | |
catch(const Exception& e) { | |
std::cerr << "QUERY " << id << "failed: " << e.displayText() << std::endl; | |
} | |
} | |
void remove(const std::string id) | |
{ | |
try { | |
auto con = takeConnection(); | |
auto c = static_cast<MongoDB::Connection::Ptr>(con); | |
MongoDB::Document::Ptr query(new MongoDB::Document()); | |
query->add("_id", id); | |
MongoDB::Document::Ptr del(new MongoDB::Document()); | |
del->add("q", query).add("limit", 1); | |
MongoDB::Array::Ptr deletes(new MongoDB::Array()); | |
deletes->add(std::to_string(0), del); | |
auto deleteCmd = g_db.createCommand(); | |
deleteCmd->selector() | |
.add("delete", "Object") | |
.add("deletes", deletes); | |
std::cout << "REMOVE : " << id << std::endl; | |
MongoDB::ResponseMessage response; | |
c->sendRequest(*deleteCmd, response); | |
auto doc = *(response.documents()[0]); | |
verifyResponse(doc); | |
for (auto i : response.documents()) { | |
std::cout << i->toString(2) << std::endl; | |
} | |
} | |
catch(const Exception& e) { | |
std::cerr << "REMOVE " << id << " failed: " << e.displayText() << std::endl; | |
} | |
} | |
static const std::size_t poolCapacity = 16; | |
static const std::size_t poolPeakCapacity = 256; | |
int main(int argc, char* argv[]) | |
{ | |
g_connectionFactory.reset(new MongoDBConnectionFactory("swift-test.topit.ng:27017")); | |
g_connectionPool.reset(new MongoDBConnectionPool(*g_connectionFactory, poolCapacity, poolPeakCapacity)); | |
std::vector<std::thread> threads; | |
for (int i(0); i<10; ++i) { | |
// const std::string id = UUIDGenerator().createRandom().toString(); | |
const std::string id = std::to_string(i); | |
insert(id); | |
//std::this_thread::sleep_for(std::chrono::seconds(1)); | |
threads.push_back(std::thread([id]() { | |
query(id); | |
remove(id); | |
})); | |
} | |
std::cout << "waiting for threads..." << std::endl; | |
for (auto& t : threads) { | |
if (t.joinable()) { | |
t.join(); | |
} | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment