Skip to content

Instantly share code, notes, and snippets.

@arajkumar
Created February 26, 2024 18:05
Show Gist options
  • Save arajkumar/638e78002b65f14b82957c21f0ce8124 to your computer and use it in GitHub Desktop.
Save arajkumar/638e78002b65f14b82957c21f0ce8124 to your computer and use it in GitHub Desktop.
pipeline benchmarking
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <libpq-fe.h>
bool exit_pipe(PGconn *connection )
{
PGpipelineStatus status = PQpipelineStatus(connection);
if (status != PQ_PIPELINE_ON)
{
return true;
}
int ok = PQpipelineSync(connection);
if (!ok)
{
return false;
}
int results = 0;
while (PQconsumeInput(connection) != 0)
{
PGresult *res = PQgetResult(connection);
if (res == NULL)
{
continue;
}
results++;
ExecStatusType resultStatus = PQresultStatus(res);
fprintf(stderr, "resultStatus: %d count %d\n", resultStatus, results);
PQclear(res);
if (resultStatus == PGRES_PIPELINE_SYNC)
{
break;
}
bool ok =
resultStatus == PGRES_SINGLE_TUPLE ||
resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COPY_BOTH ||
resultStatus == PGRES_COMMAND_OK;
if (!ok)
{
return false;
}
}
return true;
}
/* PostgreSQL ("Grand Unified Configuration") setting */
typedef struct GUC
{
char *name;
char *value;
} GUC;
#define COMMON_GUC_SETTINGS \
{ "client_encoding", "'UTF-8'" }, \
{ "extra_float_digits", "3" }, \
{ "statement_timeout", "0" }, \
{ "default_transaction_read_only", "off" }
GUC applySettingsSync[] = {
COMMON_GUC_SETTINGS,
{ "synchronous_commit", "on" },
{ "session_replication_role", "'replica'" },
{ NULL, NULL },
};
GUC applySettings[] = {
COMMON_GUC_SETTINGS,
{ "synchronous_commit", "off" },
{ "session_replication_role", "'replica'" },
{ NULL, NULL },
};
int main() {
const char *conninfo = getenv("PGCOPYDB_TARGET_PGURI");
PGconn *conn;
PGresult *res;
/* Connect to the database */
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(conn));
PQfinish(conn);
exit(1);
}
int status = PQsetnonblocking(conn, 1 /* 1-non blocking, 0-blocking */);
if (status != 0)
{
fprintf(stderr, "PQsetnonblocking failed: %s", PQerrorMessage(conn));
return false;
}
{
const char* paramValues[1] = {"pgcopydb"};
if (!PQsendQueryParams(conn, "select pg_replication_origin_session_setup($1)", 1, NULL, paramValues, NULL, NULL, 0)) {
fprintf(stderr, "replication origin session setup: %s", PQerrorMessage(conn));
}
res = PQgetResult(conn);
PQclear(res);
res = PQgetResult(conn);
PQclear(res);
}
/* Start pipeline mode */
if (!PQenterPipelineMode(conn)) {
fprintf(stderr, "Entering pipeline mode failed: %s", PQerrorMessage(conn));
PQfinish(conn);
exit(1);
}
int i = 0;
int syncBatch = 100;
for (i = 0; i < 50000; i++)
{
// apply applySettingsSync
//
GUC *guc;
bool sync = i % syncBatch == 0;
if (sync) {
guc = applySettingsSync;
} else {
guc = applySettings;
}
for (int j = 0; guc[j].name != NULL; j++)
{
char sql[1024] = { 0 };
snprintf(sql, sizeof(sql), "SET %s TO %s",
guc[j].name, guc[j].value);
/* Begin transaction */
if (!PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 0)) {
fprintf(stderr, "Sending BEGIN failed: %s", PQerrorMessage(conn));
}
}
/* Begin transaction */
if (!PQsendQueryParams(conn, "BEGIN", 0, NULL, NULL, NULL, NULL, 0)) {
fprintf(stderr, "Sending BEGIN failed: %s", PQerrorMessage(conn));
}
/* Insert statement */
const char* paramValues[3] = {"now()", "hello", "123"};
if (!PQsendQueryParams(conn, "INSERT INTO metrics(\"time\", name, value) VALUES ($1, $2, $3)", 3, NULL, paramValues, NULL, NULL, 0)) {
fprintf(stderr, "Sending INSERT failed: %s", PQerrorMessage(conn));
}
if (!PQsendQueryParams(conn, "select pg_replication_origin_xact_setup('0/0', now())", 0, NULL, NULL, NULL, NULL, 0)) {
fprintf(stderr, "Sending COMMIT failed: %s", PQerrorMessage(conn));
}
/* Commit transaction */
if (!PQsendQueryParams(conn, "COMMIT", 0, NULL, NULL, NULL, NULL, 0)) {
fprintf(stderr, "Sending COMMIT failed: %s", PQerrorMessage(conn));
}
// PQflush(conn);
if (sync) {
/* Process pipeline results */
uint32_t now = time(NULL);
if (!exit_pipe(conn)) {
fprintf(stderr, "Exiting pipeline mode failed: %s", PQerrorMessage(conn));
}
fprintf(stderr, "Sync took %d sec\n", time(NULL) - now);
}
}
if (!exit_pipe(conn)) {
fprintf(stderr, "Exiting pipeline mode failed: %s", PQerrorMessage(conn));
}
fprintf(stderr, "Exiting pipeline mode success\n");
return 0;
}
@arajkumar
Copy link
Author

To compile,

gcc pq.c -o pq -I$(pg_config --includedir) -lpq

To run

time pq

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment