Skip to content

Instantly share code, notes, and snippets.

@fernandoc1
Created November 20, 2017 17:30
Show Gist options
  • Save fernandoc1/a098e156a41933c627e101a54e65d220 to your computer and use it in GitHub Desktop.
Save fernandoc1/a098e156a41933c627e101a54e65d220 to your computer and use it in GitHub Desktop.
PostgreSQL notification with Qt5
#include "db_connector.hpp"
#include <QSqlDriver>
DBConnector::DBConnector(QString ip, int port, QString databaseName, QString username, QString password)
: db(QSqlDatabase::addDatabase("QPSQL"))
{
this->db.setHostName(ip);
this->db.setPort(port);
this->db.setDatabaseName(databaseName);
this->db.setUserName(username);
this->db.setPassword(password);
}
void DBConnector::subscribeToNotification(NotificationHandlerPtr handler)
{
QSqlDatabase::database().driver()->subscribeToNotification(handler->getName());
QObject::connect(QSqlDatabase::database().driver(), SIGNAL(notification(const QString&)), handler.get(), SLOT(handleNotification(const QString&)));
}
void DBConnector::subscribeToNotifications(QList<NotificationHandlerPtr> handlerList)
{
for(int i = 0; i < handlerList.size(); i++)
{
QSqlDatabase::database().driver()->subscribeToNotification(handlerList[i]->getName());
QObject::connect(QSqlDatabase::database().driver(), SIGNAL(notification(const QString&)), handlerList[i].get(), SLOT(handleNotification(const QString&)));
qInfo() << "Subscribed to " << handlerList[i]->getName();
}
}
bool DBConnector::connect()
{
return this->db.open();
}
QList<QVariant> DBConnector::getColumnsName(QString tableName)
{
if(!this->db.isOpen())
{
throw std::runtime_error("DBConnector::getColumnsName: database is not open");
}
QList<QVariant> columnNames;
QString command("SELECT column_name FROM information_schema.columns WHERE table_name='");
command.append(tableName);
command.append("'");
QSqlQuery query(command);
while(query.next())
{
columnNames.push_back(query.value(0));
}
return columnNames;
}
QString DBConnector::getSelectCommandForColumns(QString tableName, QList<QVariant> columnNames, QString constraintString)
{
if(!this->db.isOpen())
{
throw std::runtime_error("DBConnector::getSelectCommandForColumns: database is not open");
}
QString command("SELECT");
for(int i = 0; i < columnNames.size(); i++)
{
command.append(" ");
command.append(columnNames[i].toString());
if(i < columnNames.size() - 1)
{
command.append(",");
}
}
command.append(" FROM ");
command.append(tableName);
if(constraintString.size() > 0)
{
command.append(" WHERE ");
command.append(constraintString);
}
return command;
}
QList<QVariantMap> DBConnector::getAllRows(QString tableName)
{
if(!this->db.isOpen())
{
throw std::runtime_error("DBConnector::getAllRows: database is not open");
}
QList<QVariant> columnNames = this->getColumnsName(tableName);
QString command = this->getSelectCommandForColumns(tableName, columnNames);
std::cout << "Command: " << command.toStdString() << std::endl;
QList<QVariantMap> rows;
QSqlQuery query(command);
while(query.next())
{
QVariantMap row;
for(int i = 0; i < columnNames.size(); i++)
{
row[columnNames[i].toString()] = query.value(i);
}
rows.push_back(row);
}
return rows;
}
QJsonDocument DBConnector::getQueryJson(QString queryStr)
{
QString command("SELECT array_to_json(array_agg(row_to_json(query_table))) FROM (");
command.append(queryStr);
command.append(") query_table");
std::cout << "Command sent: " << command.toStdString() << std::endl;
if(!this->db.isOpen())
{
throw std::runtime_error("DBConnector::getQueryJson: database is not open");
}
QSqlQuery query(command);
query.next();
QString result = query.value(0).toString();
if (query.lastError().isValid())
{
qDebug() << "DBConnector::getQueryJson: " << query.lastError();
throw std::runtime_error("DBConnector::getQueryJson: error on query");
}
//std::cout << "Output: " << result.toStdString() << std::endl;
QByteArray jsonData = QByteArray::fromStdString(result.toStdString());
return QJsonDocument::fromJson(jsonData);
}
QJsonArray DBConnector::getQueryJsonArray(QString queryStr)
{
QJsonDocument document = this->getQueryJson(queryStr);
if(!document.isArray())
{
qDebug() << "DBConnector::getQueryJsonArray: WARNING!!!!! -> returned document is not an array";
//throw std::runtime_error("DBConnector::getQueryJsonArray: returned document is not an array");
}
return document.array();
}
uint32_t DBConnector::insertJsonIntoTable(QString tableName, QString tableIdColumn, QJsonObject jsonObj)
{
if(!this->db.isOpen())
{
throw std::runtime_error("DBConnector::insertJsonIntoTable: database is not open");
}
QJsonDocument doc(jsonObj);
QString command("INSERT INTO ");
command.append(tableName);
command.append("(");
QStringList keys = jsonObj.keys();
for(int i = 0; i < keys.size(); i++)
{
if(!jsonObj[keys[i]].isNull())
{
command.append(keys[i]);
if((i+1) != keys.size())
{
command.append(", ");
}
}
}
command.append(") VALUES (");
for(int i = 0; i < keys.size(); i++)
{
if(!jsonObj[keys[i]].isNull())
{
QJsonValue value = jsonObj[keys[i]];
QVariant variant = value.toVariant();
command.append("'");
command.append(variant.toString());
command.append("'");
if((i+1) != keys.size())
{
command.append(", ");
}
}
}
command.append("); ");
command.append("SELECT currval(pg_get_serial_sequence('");
command.append(tableName);
command.append("', '");
command.append(tableIdColumn);
command.append("'))");
std::cout << "Command sent: " << command.toStdString() << std::endl;
QSqlQuery query(command);
query.next();
QString result = query.value(0).toString();
return (uint32_t)result.toInt();
}
uint32_t DBConnector::insertJsonIntoSequentialTable(QString tableName, QString tableIdColumn, QString sequenceIdName, QJsonObject jsonObj)
{
if(!this->db.isOpen())
{
throw std::runtime_error("DBConnector::insertJsonIntoSequentialTable: database is not open");
}
QJsonDocument doc(jsonObj);
QString command("INSERT INTO ");
command.append(tableName);
command.append("(");
QStringList keys = jsonObj.keys();
for(int i = 0; i < keys.size(); i++)
{
command.append(keys[i]);
if((i+1) != keys.size())
{
command.append(", ");
}
}
command.append(") VALUES (");
for(int i = 0; i < keys.size(); i++)
{
if(!jsonObj[keys[i]].isNull())
{
QJsonValue value = jsonObj[keys[i]];
QVariant variant = value.toVariant();
command.append("'");
command.append(variant.toString());
command.append("'");
}
else
{
if(keys[i] == tableIdColumn)
{
command.append("nextval('");
command.append(sequenceIdName);
command.append("')");
}
else
{
command.append("null");
}
}
if((i+1) != keys.size())
{
command.append(", ");
}
}
command.append("); ");
command.append("SELECT currval('");
command.append(sequenceIdName);
command.append("');");
std::cout << "Command sent: " << command.toStdString() << std::endl;
QSqlQuery query(command);
query.next();
QString result = query.value(0).toString();
return (uint32_t)result.toInt();
}
void DBConnector::executeRawQuery(QString command)
{
QSqlQuery query(command);
}
#ifndef __DB_CONNECTOR_HPP__
#define __DB_CONNECTOR_HPP__
#include <iostream>
#include <QCoreApplication>
#include <QSqlDatabase>
#include <QSqlQuery>
#include <QDebug>
#include <QSqlError>
#include <QJsonObject>
#include <QJsonDocument>
#include <QJsonArray>
#include <QList>
#include <memory>
#include "notification_handler.hpp"
class DBConnector;
typedef std::shared_ptr<DBConnector> DBConnectorPtr;
class DBConnector
{
QSqlDatabase db;
public:
DBConnector(QString ip, int port, QString databaseName, QString username, QString password);
void subscribeToNotification(NotificationHandlerPtr handler);
void subscribeToNotifications(QList<NotificationHandlerPtr> handlerList);
bool connect();
QList<QVariant> getColumnsName(QString tableName);
QString getSelectCommandForColumns(QString tableName, QList<QVariant> columnNames, QString constraintString = QString());
QList<QVariantMap> getAllRows(QString tableName);
QJsonDocument getQueryJson(QString queryStr);
QJsonArray getQueryJsonArray(QString queryStr);
uint32_t insertJsonIntoTable(QString tableName, QString tableIdColumn, QJsonObject jsonObj);
uint32_t insertJsonIntoSequentialTable(QString tableName, QString tableIdColumn, QString sequenceIdName, QJsonObject jsonObj);
void executeRawQuery(QString command);
};
#endif
#include "db_connector.hpp"
#include <iostream>
int main(int argc, char** argv)
{
QCoreApplication app(argc, argv);
DBConnector connector("127.0.0.1", 5432, "postgres", "postgres", "12345");
if(!connector.connect())
{
std::cout << "Error connecting to database" << std::endl;
return 1;
}
NotificationHandler notification("watcher");
return app.exec();
}
#include "notification_handler.hpp"
#include <iostream>
#include <QDebug>
NotificationHandler::NotificationHandler(QString notificationName)
: name(notificationName)
{
QSqlDatabase::database().driver()->subscribeToNotification(notificationName);
QObject::connect(QSqlDatabase::database().driver(),
SIGNAL(notification(QString, QSqlDriver::NotificationSource, QVariant)),
this,
SLOT(handleNotification(QString, QSqlDriver::NotificationSource, QVariant)));
}
QString NotificationHandler::getName()
{
return this->name;
}
void NotificationHandler::handleNotification(const QString& data)
{
if(this->name == data)
{
std::cout << "NotificationHandler::handleNotification: " << data.toStdString() << std::endl;
emit notify();
}
}
void NotificationHandler::handleNotification(const QString& name, QSqlDriver::NotificationSource source, const QVariant& payload)
{
qInfo() << "Name: " << name << " Payload:" << payload.toString();
}
QList<NotificationHandlerPtr> NotificationHandler::getNotificationHandlerList(QList<QString> stringList)
{
QList<NotificationHandlerPtr> notificationHandlerList;
for(int i = 0; i < stringList.size(); i++)
{
NotificationHandlerPtr notificationHandler(new NotificationHandler(stringList[i]));
notificationHandlerList.append(notificationHandler);
}
return notificationHandlerList;
}
#ifndef __NOTIFICATION_HANDLER_HPP__
#define __NOTIFICATION_HANDLER_HPP__
#include <QObject>
#include <QString>
#include <QSqlDriver>
#include <QSqlDatabase>
#include <memory>
class NotificationHandler;
typedef std::shared_ptr<NotificationHandler> NotificationHandlerPtr;
class NotificationHandler: public QObject
{
Q_OBJECT
QString name;
public:
NotificationHandler(QString notificationName);
QString getName();
static QList<NotificationHandlerPtr> getNotificationHandlerList(QList<QString> stringList);
public slots:
void handleNotification(const QString& data);
void handleNotification(const QString& name, QSqlDriver::NotificationSource source, const QVariant& payload);
signals:
void notify();
};
#endif
CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $$
DECLARE
dataRecord RECORD;
BEGIN
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
dataRecord := NEW;
WHEN 'DELETE' THEN
dataRecord := OLD;
END CASE;
PERFORM pg_notify(CAST('watcher' AS text),
CONCAT('{"TableName":"',
TG_TABLE_NAME,
'", "Operation": "',
TG_OP,
'", "Data": ',
to_json(dataRecord),
'}')
);
RETURN dataRecord;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS tableName_trigger on tableName;
CREATE TRIGGER tableName_trigger
AFTER INSERT OR UPDATE OR DELETE
ON tableName
FOR EACH ROW EXECUTE PROCEDURE notify_trigger();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment