Skip to content

Instantly share code, notes, and snippets.

@revmischa
Last active February 23, 2023 19:52
Show Gist options
  • Save revmischa/5384678 to your computer and use it in GitHub Desktop.
Save revmischa/5384678 to your computer and use it in GitHub Desktop.
Example postgresql asynchronous connection with LISTEN handler
#include <stdio.h>
#include <stdlib.h>
#include <libpq-fe.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
// channel to LISTEN on
const char *listenChannel = "foo";
void mainLoop(PGconn *conn);
void exitClean(PGconn *conn);
void handlePgRead(PGconn *conn);
void initListen(PGconn *conn);
int main() {
const char* connInfoKeys[] = {
"host",
NULL
};
const char* connInfoValues[] = {
"localhost",
NULL
};
PGconn *conn = PQconnectStartParams(connInfoKeys, connInfoValues, 0);
ConnStatusType status = PQstatus(conn);
if (status == CONNECTION_BAD) {
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(conn));
exitClean(conn);
}
if (status == CONNECTION_STARTED) {
printf("Connection started...\n");
}
mainLoop(conn);
PQfinish(conn);
}
void exitClean(PGconn *conn) {
PQfinish(conn);
exit(1);
}
void mainLoop(PGconn *conn) {
fd_set rfds, wfds;
struct timeval tv;
int retval;
int sock;
int done = 0;
int connected = 0;
int sentListen = 0;
PostgresPollingStatusType connStatus;
// wait for
while (! done) {
sock = PQsocket(conn);
if (sock < 0) {
printf("Postgres socket is gone\n");
exitClean(conn);
}
FD_ZERO(&rfds);
FD_ZERO(&wfds);
tv.tv_sec = 2;
tv.tv_usec = 0;
if (! connected) {
connStatus = PQconnectPoll(conn);
switch (connStatus) {
case PGRES_POLLING_FAILED:
fprintf(stderr, "Pg connection failed: %s",
PQerrorMessage(conn));
return;
case PGRES_POLLING_WRITING:
FD_SET(sock, &wfds);
break;
case PGRES_POLLING_READING:
FD_SET(sock, &rfds);
break;
case PGRES_POLLING_OK:
connected = 1;
initListen(conn);
break;
}
}
if (connected) {
FD_SET(sock, &rfds);
}
retval = select(sock + 1, &rfds, &wfds, NULL, &tv);
switch (retval) {
case -1:
perror("select() failed");
done = 1;
break;
case 0:
// timeout
break;
default:
if (! connected)
break;
if (FD_ISSET(sock, &rfds)) {
// ready to read from pg
handlePgRead(conn);
}
break;
}
}
}
void initListen(PGconn *conn) {
// quote channel identifier
char *quotedChannel = PQescapeIdentifier(conn, listenChannel, strlen(listenChannel));
char *query;
asprintf(&query, "LISTEN %s", quotedChannel);
int qs = PQsendQuery(conn, query);
PQfreemem(quotedChannel);
free(query);
if (! qs) {
fprintf(stderr, "Failed to send query %s\n", PQerrorMessage(conn));
return;
}
}
void handlePgRead(PGconn *conn) {
PGnotify *notify;
PGresult *res;
PQprintOpt opt;
// read data waiting in buffer
if (! PQconsumeInput(conn)) {
fprintf(stderr, "Failed to consume pg input: %s\n",
PQerrorMessage(conn));
return;
}
// got query results?
while (res = PQgetResult(conn)) {
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "Result error: %s", PQerrorMessage(conn));
PQclear(res);
return;
}
memset(&opt, '\0', sizeof(opt));
PQprint(stdout, res, &opt);
printf("Got result\n");
}
// check for async notifs
while (notify = PQnotifies(conn)) {
fprintf(stderr,
"NOTIFY of '%s' received from backend PID %d\n",
notify->relname, notify->be_pid);
PQfreemem(notify);
}
}
@Globik
Copy link

Globik commented Jul 17, 2018

What's this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment