Instantly share code, notes, and snippets.

Embed
What would you like to do?
BerkeleyDB Qt wrappers
#include "database.h"
#include <QDir>
#include <cstring>
#include <cmath>
#include <db.h>
#include "../Common/defines.h"
#include "constants.h"
#define DEFAULT_PAGESIZE (1024)
#define DEFAULT_CONCURRENT_TRANSACTIONS (100)
#define DEFAULT_DEADLOCK_DETECTION_SECONDS (10)
#define MEMORY_ONE_MB (1024*1024)
// memory required for locks, db handles and statistics overhead
#define MAX_MEMORY_GBYTES (0)
#ifdef QT_DEBUG
#define MAX_MEMORY_BYTES (2*MEMORY_ONE_MB)
#else
#define MAX_MEMORY_BYTES (4*MEMORY_ONE_MB)
#endif
#define DEFAULT_CACHE_SPLIT_PARTS (5)
#define DEFAULT_CACHE_GBYTES (0)
#define MAX_CACHE_GBYTES (0)
#ifdef QT_DEBUG
#define DEFAULT_CACHE_BYTES (10*MEMORY_ONE_MB)
#define MAX_CACHE_BYTES (100*MEMORY_ONE_MB)
#else
#define DEFAULT_CACHE_BYTES (100*MEMORY_ONE_MB)
#define MAX_CACHE_BYTES (200*MEMORY_ONE_MB)
#endif
#ifdef QT_DEBUG
#define BUFFER_EXPAND_ATTEMPTS 20
#else
#define BUFFER_EXPAND_ATTEMPTS 5
#endif
namespace Helpers {
QString ensureDBDirectoryExists(const QString &dbDirName) {
QString appDataPath = XPIKS_USERDATA_PATH;
QString path;
if (!appDataPath.isEmpty()) {
path = QDir::cleanPath(appDataPath + QDir::separator() + dbDirName);
QDir dbDir(path);
if (!dbDir.exists()) {
LOG_INFO << "Creating db dir" << path;
QDir().mkpath(path);
}
} else {
path = QDir::currentPath();
}
return path;
}
Database::Database(__db_env *environment, int maxBufferSize, AsyncCoordinator *finalizeCoordinator):
m_FinalizeCoordinator(finalizeCoordinator),
m_Environment(environment),
m_Database(nullptr),
m_StartValueBufferSize(maxBufferSize),
m_IsOpened(false)
{
Q_ASSERT(maxBufferSize > 0);
Q_ASSERT(environment != nullptr);
AsyncCoordinatorLocker locker(m_FinalizeCoordinator);
Q_UNUSED(locker);
}
Database::~Database() {
Q_ASSERT(m_IsOpened == false);
if (m_IsOpened) {
close();
}
}
bool Database::open(const char *dbName) {
Q_ASSERT(m_Database == nullptr);
LOG_DEBUG << dbName;
bool success = false;
int result = 0;
do {
result = db_create(&m_Database, m_Environment, 0);
if (result != 0) {
LOG_WARNING << "Failed creating a database instance:" << db_strerror(result);
break;
}
result = m_Database->set_pagesize(m_Database, DEFAULT_PAGESIZE);
if (result != 0) {
LOG_WARNING << "Failed to set pagesize:" << db_strerror(result);
break;
}
u_int32_t db_flags;
db_flags = DB_CREATE | // create the database if not exists
DB_AUTO_COMMIT | // Enclose the DB->open() call within a transaction
DB_THREAD; // thread-safety
result = m_Database->open(m_Database, /* Pointer to the database */
NULL, /* Txn pointer */
dbName, /* File name */
NULL, /* Logical db name */
DB_BTREE, /* Database type (using btree) */
db_flags, /* Open flags */
0); /* File mode. Using defaults */
if (result != 0) {
LOG_WARNING << "Failed to open the database" << dbName << db_strerror(result);
break;
}
m_IsOpened = true;
success = true;
} while (false);
return success;
}
void Database::close() {
LOG_DEBUG << "#";
Q_ASSERT(m_Database != nullptr);
// sync and close database
int result = m_Database->close(m_Database, 0);
if (result != 0) {
LOG_WARNING << "Closing database failed:" << db_strerror(result);
} else {
m_IsOpened = false;
m_Database = nullptr;
}
AsyncCoordinatorUnlocker unlocker(m_FinalizeCoordinator);
Q_UNUSED(unlocker);
}
void Database::sync() {
LOG_DEBUG << "#";
Q_ASSERT(m_Database != nullptr);
int result = m_Database->sync(m_Database, 0);
if (result != 0) {
LOG_WARNING << "Database sync failed" << db_strerror(result);
}
}
void Database::warmupCache(int percent) {
LOG_DEBUG << "#";
Q_ASSERT((0 <= percent) && (percent <= 100));
Q_ASSERT(m_Database != nullptr);
Q_ASSERT(m_Environment != nullptr);
// here and below code from "Warming the memory pool" BerkeleyDB docs
do {
u_int32_t pagesize, gbytes, bytes;
int ret = 0, numcachepages;
/* Find out how many pages can fit at most in the cache */
ret = m_Environment->get_mp_pagesize(m_Environment, &pagesize);
if (ret != 0) {
LOG_WARNING << "Error retrieving the cache pagesize:" << db_strerror(ret);
break;
}
ret = m_Environment->get_cache_max(m_Environment, &gbytes, &bytes);
if (ret != 0) {
LOG_WARNING << "Error retrieving maximum cache size:" << db_strerror(ret);
break;
}
/* Avoid an overflow by first calculating pages per gigabyte. */
numcachepages = gbytes * ((1024 * 1024 * 1024) / pagesize);
numcachepages += bytes / pagesize;
LOG_DEBUG << numcachepages << "can fit";
double realPages = floor(numcachepages * ((double)percent / 100.0));
int result = doWarmupCache((int)realPages);
if (result != 0) {
LOG_WARNING << "Cache warmup failed";
} else {
LOG_INFO << "Cache warmup succeeded";
}
} while (false);
}
bool Database::exists(const QByteArray &key) {
Q_ASSERT(m_Database != nullptr);
int result = checkExists(key);
bool missing = result == DB_NOTFOUND;
return !missing;
}
bool Database::tryGetKey(const QByteArray &key, QByteArray &value) {
Q_ASSERT(m_Database != nullptr);
int attempts = BUFFER_EXPAND_ATTEMPTS;
int result = 0;
int bufferSize = m_StartValueBufferSize;
while (attempts--) {
result = doGetKey(key, value, bufferSize);
if ((result != DB_NOTFOUND) && (result != DB_KEYEMPTY)) {
LOG_WARNING << "Error reading" << key << "from database:" << db_strerror(result);
}
if (result == DB_BUFFER_SMALL) {
bufferSize = 2*bufferSize + 1;
} else {
break;
}
}
// do not save this for debug in order to test code above
#ifdef QT_NO_DEBUG
if (result == 0) {
// if we have concurrency issue here, then anyway eventually
// max buffer size will be properly adjusted
m_MaxValueBufferSize = bufferSize;
}
#endif
bool success = result == 0;
return success;
}
bool Database::trySetKey(const QByteArray &key, const QByteArray &value) {
Q_ASSERT(m_Database != nullptr);
int result = doSetKey(key, value);
if (result != 0) {
LOG_WARNING << "Failed to set a key" << key << "with error:" << db_strerror(result);
}
bool success = result == 0;
return success;
}
bool Database::deleteRecord(const QByteArray &key) {
Q_ASSERT(m_Database != nullptr);
int result = doDeleteRecord(key);
if (result != 0) {
LOG_WARNING << "Failed to delete a key" << key << ":" << db_strerror(result);
}
bool success = result == 0;
return success;
}
std::unique_ptr<Database::Iterator> Database::getIterator() {
LOG_DEBUG << "#";
std::unique_ptr<Database::Iterator> it(new Database::Iterator(m_Database));
it->initialize();
LOG_DEBUG << "Iterator is valid:" << it->isValid();
return it;
}
int Database::checkExists(const QByteArray &key) {
DBT dbtKey;
memset(&dbtKey, 0, sizeof(DBT));
u_int32_t keySize = key.size();
const char *keyData = key.data();
dbtKey.data = (void*)keyData;
dbtKey.size = keySize;
int result = m_Database->exists(m_Database, NULL, &dbtKey, 0);
return result;
}
int Database::doGetKey(const QByteArray &key, QByteArray &value, int bufferSize) {
DBT dbtKey, dbtData;
memset(&dbtKey, 0, sizeof(DBT));
memset(&dbtData, 0, sizeof(DBT));
u_int32_t keySize = key.size();
const char *keyData = key.data();
dbtKey.data = (void*)keyData;
dbtKey.size = keySize;
value.fill(0, bufferSize);
char *valueData = value.data();
dbtData.data = valueData;
dbtData.ulen = bufferSize;
dbtData.flags = DB_DBT_USERMEM;
int result = m_Database->get(m_Database, NULL, &dbtKey, &dbtData, 0);
return result;
}
int Database::doSetKey(const QByteArray &key, const QByteArray &value) {
DBT dbtKey, dbtData;
memset(&dbtKey, 0, sizeof(DBT));
memset(&dbtData, 0, sizeof(DBT));
u_int32_t keySize = key.size();
const char *keyData = key.data();
dbtKey.data = (void*)keyData;
dbtKey.size = keySize;
u_int32_t valueSize = value.size() + 1;
const char *valueData = value.data();
dbtData.data = (void*)valueData;
dbtData.size = valueSize;
int result = m_Database->put(m_Database, NULL, &dbtKey, &dbtData, 0);
return result;
}
int Database::doDeleteRecord(const QByteArray &key) {
DBT dbtKey;
memset(&dbtKey, 0, sizeof(DBT));
u_int32_t keySize = key.size();
const char *keyData = key.data();
dbtKey.data = (void*)keyData;
dbtKey.size = keySize;
int result = m_Database->del(m_Database, NULL, &dbtKey, 0);
return result;
}
int Database::doWarmupCache(int pagesCount) {
LOG_INFO << pagesCount;
// here and below code from "Warming the memory pool" BerkeleyDB docs
DB_MPOOLFILE *mpf = 0;
void *page_addrp = 0;
db_pgno_t page_number = 0;
int ret = 0;
int pagecount = 0;
/*
* Get the mpool handle
*/
mpf = m_Database->get_mpf(m_Database);
/* Load pages until there are no more pages in the database,
* or until we've put as many pages into the cache as will fit.
*/
while (ret != DB_PAGE_NOTFOUND && pagecount < pagesCount) {
/*
* Get the page from the cache. This causes DB to retrieve
* the page from disk if it isn't already in the cache.
*/
ret = mpf->get(mpf, &page_number, 0, 0, &page_addrp);
if (ret && ret != DB_PAGE_NOTFOUND) {
LOG_WARNING << "Error retrieving db page:" << page_number << db_strerror(ret);
return ret;
}
/*
* If a page was retrieved, put it back into the cache. This
* releases the page latch so that the page can be evicted
* if DB needs more room in the cache at some later time.
*/
if (ret != DB_PAGE_NOTFOUND) {
ret = mpf->put(mpf, page_addrp, DB_PRIORITY_UNCHANGED, 0);
if (ret) {
LOG_WARNING << "Error putting db page:" << page_number << db_strerror(ret);
return ret;
}
}
++page_number;
++pagecount;
}
LOG_INFO << pagecount << "pages loaded";
return 0;
}
DatabaseManager::DatabaseManager():
QObject(),
m_Environment(NULL)
{
QObject::connect(&m_FinalizeCoordinator, &Helpers::AsyncCoordinator::statusReported,
this, &DatabaseManager::onReadyToFinalize);
}
bool DatabaseManager::initialize() {
const bool withoutRecovery = false;
const bool withRecovery = true;
int result = 0;
do {
result = doInitialize(Constants::DB_DIR, withoutRecovery);
if (result == 0) { break; }
LOG_WARNING << "Retrying environment initialization with recovery";
closeEnvironment();
result = doInitialize(Constants::DB_DIR, withRecovery);
if (result == 0) {
LOG_INFO << "Recovery finished successfully!";
break;
}
LOG_WARNING << "Switching to failover DB environment";
result = doInitialize(Constants::DB_DIR_FAILOVER, withoutRecovery);
if (result == 0) {
LOG_INFO << "Successfully switched to failover environment";
break;
}
LOG_WARNING << "Retrying environment initialization with recovery";
closeEnvironment();
result = doInitialize(Constants::DB_DIR_FAILOVER, withRecovery);
if (result == 0) {
LOG_INFO << "Failover recovery finished successfully!";
break;
}
LOG_WARNING << "Failed to initialize the environment";
}
while (false);
bool success = result == 0;
return success;
}
void DatabaseManager::finalize() {
LOG_DEBUG << "#";
closeAll();
closeEnvironment();
LOG_INFO << "Finalize finished";
}
int DatabaseManager::doInitialize(const QString &dbDirName, bool withRecovery) {
Q_ASSERT(m_Environment == nullptr);
LOG_INFO << "with recovery =" << withRecovery;
int result = 0;
do {
result = db_env_create(&m_Environment, 0);
if (result != 0) {
LOG_WARNING << "Failed to create an environment:" << db_strerror(result);
break;
}
LOG_DEBUG << "DB Environment allocated";
u_int32_t additional_flags;
additional_flags = DB_AUTO_COMMIT | /*wrap all operations in transactions*/
DB_TXN_NOSYNC; /*do not flush write-ahead logs*/
result = m_Environment->set_flags(m_Environment, additional_flags, 1);
if (result != 0) {
LOG_WARNING << "Failed to set additional flags:" << db_strerror(result);
break;
}
// deadlock detection
result = m_Environment->set_timeout(m_Environment, DEFAULT_DEADLOCK_DETECTION_SECONDS, DB_SET_TXN_TIMEOUT);
if (result != 0) {
LOG_WARNING << "Failed to set the deadlock detection timeout:" << db_strerror(result);
break;
}
result = m_Environment->set_tx_max(m_Environment, DEFAULT_CONCURRENT_TRANSACTIONS);
if (result != 0) {
LOG_WARNING << "Failed to set concurrent transactions number:" << db_strerror(result);
break;
}
result = m_Environment->set_cache_max(m_Environment, MAX_CACHE_GBYTES, MAX_CACHE_BYTES);
if (result != 0) {
LOG_WARNING << "Failed to set max cache size:" << db_strerror(result);
break;
}
result = m_Environment->set_cachesize(m_Environment, DEFAULT_CACHE_GBYTES, DEFAULT_CACHE_BYTES, DEFAULT_CACHE_SPLIT_PARTS);
if (result != 0) {
LOG_WARNING << "Failed to set cache size:" << db_strerror(result);
break;
}
result = m_Environment->set_memory_max(m_Environment, MAX_MEMORY_GBYTES, MAX_MEMORY_BYTES);
if (result != 0) {
LOG_WARNING << "Failed to set memory max size:" << db_strerror(result);
break;
}
LOG_DEBUG << "DB Environment configured";
QString dbDirPath = ensureDBDirectoryExists(dbDirName);
QByteArray dbDir = dbDirPath.toUtf8();
char *dbDirData = dbDir.data();
u_int32_t env_flags;
env_flags = 0 |
DB_INIT_LOCK | /* Initialize locking */
DB_INIT_LOG | /* Initialize logging */
DB_INIT_MPOOL | /* Initialize the cache */
DB_INIT_TXN | /* Initialize transactions */
DB_CREATE | /* If the environment does not exist, create it. */
//DB_PRIVATE | DO NOT specify if using failcheck or recovery
DB_THREAD | /* Multithreading support */
DB_RECOVER | /* run recovery */
DB_USE_ENVIRON; /* allow any paths for db files */
if (withRecovery) {
env_flags = env_flags | DB_RECOVER_FATAL;
}
result = m_Environment->open(m_Environment, dbDirData, env_flags, 0);
if (result != 0) {
LOG_WARNING << "Failed to open an environment:" << db_strerror(result);
break;
}
LOG_DEBUG << "DB Environment opened";
m_DBDirPath = dbDirPath;
} while (false);
return result;
}
int DatabaseManager::closeEnvironment() {
LOG_DEBUG << "#";
Q_ASSERT(m_Environment != nullptr);
int result = m_Environment->close(m_Environment, 0);
if (result != 0) {
LOG_WARNING << "Failed to close an environment:" << db_strerror(result);
} else {
LOG_DEBUG << "Environment closed";
m_Environment = nullptr;
}
return result;
}
std::shared_ptr<Database> DatabaseManager::openDatabase(const char *dbName) {
LOG_DEBUG << dbName;
Q_ASSERT(m_Environment != nullptr);
std::shared_ptr<Database> db(new Database(m_Environment, DEFAULT_READ_BUFFER_START_SIZE, &m_FinalizeCoordinator));
bool openSucceded = db->open(dbName);
if (!openSucceded) {
LOG_WARNING << "Failed to open" << dbName;
db->close();
db.reset();
} else {
LOG_INFO << "Opened" << dbName << "database";
m_DatabaseArray.push_back(db);
}
return db;
}
void DatabaseManager::createCheckpoint() {
LOG_DEBUG << "#";
Q_ASSERT(m_Environment != nullptr);
int result = m_Environment->txn_checkpoint(m_Environment, 0, 0, 0);
if (result != 0) {
LOG_WARNING << "Failed to checkpoint environment:" << db_strerror(result);
} else {
LOG_INFO << "Checkpoint created";
}
}
void DatabaseManager::prepareToFinalize() {
LOG_DEBUG << "#";
m_FinalizeCoordinator.allBegun();
}
void DatabaseManager::onReadyToFinalize(int status) {
LOG_INFO << status;
finalize();
}
void DatabaseManager::closeAll() {
LOG_DEBUG << "#";
Q_ASSERT(m_Environment != nullptr);
for (auto &db: m_DatabaseArray) {
db->close();
}
LOG_INFO << "Databases closed";
}
Database::Iterator::Iterator(__db *database):
m_Database(database),
m_Cursor(nullptr),
m_CurrentKey(new DBT()),
m_CurrentValue(new DBT()),
m_IsValid(false),
m_IsInitialized(false)
{
Q_ASSERT(database != nullptr);
}
Database::Iterator::~Iterator() {
if (m_CurrentKey != nullptr) {
delete m_CurrentKey;
}
if (m_CurrentValue != nullptr) {
delete m_CurrentValue;
}
if (m_Cursor != nullptr) {
m_Cursor->close(m_Cursor);
}
}
bool Database::Iterator::moveNext() {
Q_ASSERT(m_IsInitialized);
if (!m_IsValid) {
return false;
}
const bool success = doMoveNext();
m_IsValid = success;
return success;
}
void Database::Iterator::initialize() {
Q_ASSERT(!m_IsInitialized);
u_int32_t flags = 0;
// optimize for bulk operations to continue on the
// same database page as the previous operation
flags |= DB_CURSOR_BULK;
int result = m_Database->cursor(m_Database, NULL, &m_Cursor, flags);
if (result != 0) {
LOG_WARNING << "Failed to checkpoint environment:" << db_strerror(result);
m_IsValid = false;
} else {
m_IsValid = true;
}
m_IsInitialized = true;
}
bool Database::Iterator::doMoveNext() {
int attempts = BUFFER_EXPAND_ATTEMPTS;
int result = 0;
int keyBufferSize = m_KeyStartBufferSize;
int valueBufferSize = m_ValueStartBufferSize;
while (attempts--) {
result = moveCursor(keyBufferSize, valueBufferSize);
if (result == DB_BUFFER_SMALL) {
keyBufferSize = 2*keyBufferSize + 1;
valueBufferSize = 2*valueBufferSize + 1;
} else {
if (result == DB_NOTFOUND) {
LOG_INFO << "Reached the end of the database";
} else {
LOG_WARNING << "Error moving cursor through database:" << db_strerror(result);
}
break;
}
}
// do not save this for debug in order to test code above
#ifdef QT_NO_DEBUG
if (result == 0) {
// if we have concurrency issue here, then anyway eventually
// max buffer size will be properly adjusted
m_KeyStartBufferSize = keyBufferSize;
m_ValueStartBufferSize = valueBufferSize;
}
#endif
bool success = result == 0;
return success;
}
int Database::Iterator::moveCursor(int keyBufferSize, int valueBufferSize) {
memset(m_CurrentKey, 0, sizeof(DBT));
memset(m_CurrentValue, 0, sizeof(DBT));
m_KeyBuffer.fill(0, keyBufferSize);
u_int32_t keySize = m_KeyBuffer.size();
char *keyData = m_KeyBuffer.data();
m_CurrentKey->data = (void*)keyData;
m_CurrentKey->size = keySize;
m_CurrentKey->flags = DB_DBT_USERMEM;
// ----
m_ValueBuffer.fill(0, valueBufferSize);
u_int32_t valueSize = m_ValueBuffer.size();
char *valueData = m_ValueBuffer.data();
m_CurrentValue->data = (void*)valueData;
m_CurrentValue->size = valueSize;
m_CurrentValue->flags = DB_DBT_USERMEM;
u_int32_t flags = 0;
// the cursor is moved to the next key/data pair of the database, and that pair is returned.
// In the presence of duplicate key values, the value of the key may not change.
flags |= DB_NEXT;
int result = m_Cursor->get(m_Cursor, m_CurrentKey, m_CurrentValue, flags);
return result;
}
}
#ifndef DATABASE_H
#define DATABASE_H
#include <QByteArray>
#include <QString>
#include <vector>
#include <memory>
#include "asynccoordinator.h"
struct __db;
struct __db_env;
struct __dbc;
struct __db_dbt;
#ifdef QT_DEBUG
// test if expanding buffer code works well in tryGetKey()
#define DEFAULT_READ_BUFFER_START_SIZE (10)
#else
#define DEFAULT_READ_BUFFER_START_SIZE (2*1024)
#endif
namespace Helpers {
class Database {
public:
Database(__db_env *environment, int maxBufferSize=DEFAULT_READ_BUFFER_START_SIZE, AsyncCoordinator *finalizeCoordinator=nullptr);
virtual ~Database();
public:
class Iterator {
friend class Database;
public:
Iterator(__db *database);
virtual ~Iterator();
// disable unwanted behavior
Iterator(const Iterator &) = delete;
Iterator &operator=(const Iterator &) = delete;
public:
bool moveNext();
const QByteArray &getCurrentKey() const { return m_KeyBuffer; }
const QByteArray &getCurrentValue() const { return m_ValueBuffer; }
bool isValid() const { return m_IsValid; }
private:
void initialize();
bool doMoveNext();
int moveCursor(int keyBufferSize, int valueBufferSize);
private:
__db *m_Database;
__dbc *m_Cursor;
__db_dbt *m_CurrentKey;
__db_dbt *m_CurrentValue;
QByteArray m_KeyBuffer;
QByteArray m_ValueBuffer;
int m_KeyStartBufferSize;
int m_ValueStartBufferSize;
volatile bool m_IsValid;
volatile bool m_IsInitialized;
};
public:
bool isOpened() const { return m_IsOpened; }
public:
bool open(const char *dbName);
void close();
void sync();
void warmupCache(int percent = 50);
public:
bool exists(const QByteArray &key);
bool tryGetKey(const QByteArray &key, QByteArray &value);
bool trySetKey(const QByteArray &key, const QByteArray &data);
bool deleteRecord(const QByteArray &key);
std::unique_ptr<Iterator> getIterator();
private:
int checkExists(const QByteArray &key);
int doGetKey(const QByteArray &key, QByteArray &value, int bufferSize);
int doSetKey(const QByteArray &key, const QByteArray &value);
int doDeleteRecord(const QByteArray &key);
int doWarmupCache(int pagesCount);
private:
AsyncCoordinator *m_FinalizeCoordinator;
__db_env *m_Environment;
__db *m_Database;
volatile int m_StartValueBufferSize;
volatile bool m_IsOpened;
};
class DatabaseManager: public QObject {
Q_OBJECT
public:
DatabaseManager();
public:
bool initialize();
private:
void finalize();
int doInitialize(const QString &dbDirName, bool withRecovery);
int closeEnvironment();
public:
std::shared_ptr<Database> openDatabase(const char *dbName);
// should be run from time to time
void createCheckpoint();
void prepareToFinalize();
private slots:
void onReadyToFinalize(int status);
private:
void closeAll();
private:
AsyncCoordinator m_FinalizeCoordinator;
__db_env *m_Environment;
QString m_DBDirPath;
std::vector<std::shared_ptr<Database> > m_DatabaseArray;
};
}
#endif // DATABASE_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment