Skip to content

Instantly share code, notes, and snippets.

@keithf4
Created April 8, 2015 21:28
Show Gist options
  • Save keithf4/0047eae0b3a22829d527 to your computer and use it in GitHub Desktop.
Save keithf4/0047eae0b3a22829d527 to your computer and use it in GitHub Desktop.
pg_partman_bgw.c
/*
* pg_partman_bgw.c
*
* A background worker process for the pg_partman extension to allow
* partition maintenance to be scheduled and run within the database
* itself without required a third-party scheduler (ex. cron)
*
*/
#include "postgres.h"
/* These are always necessary for a bgworker */
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
/* these headers are used by this particular worker's code */
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(pg_partman_bgw_launch);
void _PG_init(void);
void pg_partman_bgw_main(Datum);
/* flags set by signal handlers */
static volatile sig_atomic_t got_sighup = false;
static volatile sig_atomic_t got_sigterm = false;
/* GUC variables */
static int pg_partman_bgw_interval = 3600; // Default hourly
static char *pg_partman_bgw_role = "postgres"; // Default to postgres role
static char *pg_partman_bgw_database = "postgres"; // Default to postgres database
static char *pg_partman_bgw_analyze = "on";
static char *pg_partman_bgw_jobmon = "on";
/*
* Signal handler for SIGTERM
* Set a flag to let the main loop to terminate, and set our latch to wake
* it up.
*/
static void
pg_partman_bgw_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
/*
* Signal handler for SIGHUP
* Set a flag to tell the main loop to reread the config file, and set
* our latch to wake it up.
*/
static void
pg_partman_bgw_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
void
pg_partman_bgw_main(Datum main_arg)
{
StringInfoData buf;
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, pg_partman_bgw_sighup);
pqsignal(SIGTERM, pg_partman_bgw_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
// TODO pg_partman.bgw_database - make this a csv list of databases that pg_partman is running on)
BackgroundWorkerInitializeConnection(pg_partman_bgw_database, pg_partman_bgw_role);
elog(LOG, "%s initialized with role %s on database %s"
, MyBgworkerEntry->bgw_name
, pg_partman_bgw_role
, pg_partman_bgw_database);
initStringInfo(&buf);
/*
* Main loop: do this until the SIGTERM handler tells us to terminate
*/
while (!got_sigterm)
{
int ret;
int rc;
char *partman_schema;
char *analyze;
char *jobmon;
resetStringInfo(&buf);
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
pg_partman_bgw_interval * 1000L);
ResetLatch(&MyProc->procLatch);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
/*
* In case of a SIGHUP, just reload the configuration.
*/
if (got_sighup)
{
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Start a transaction on which we can run queries. Note that each
* StartTransactionCommand() call should be preceded by a
* SetCurrentStatementStartTimestamp() call, which sets both the time
* for the statement we're about the run, and also the transaction
* start time. Also, each other query sent to SPI should probably be
* preceded by SetCurrentStatementStartTimestamp(), so that statement
* start time is always up to date.
*
* The SPI_connect() call lets us run queries through the SPI manager,
* and the PushActiveSnapshot() call creates an "active" snapshot
* which is necessary for queries to have MVCC data to work on.
*
* The pgstat_report_activity() call makes our activity visible
* through the pgstat views.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_appname("pg_partman background worker");
appendStringInfo(&buf, "SELECT n.nspname FROM pg_catalog.pg_extension e JOIN pg_catalog.pg_namespace n ON e.extnamespace = n.oid WHERE extname = 'pg_partman'");
pgstat_report_activity(STATE_RUNNING, buf.data);
ret = SPI_execute(buf.data, true, 1);
if (ret != SPI_OK_SELECT)
elog(FATAL, "Cannot determine which schema pg_partman has been installed to: error code %d", ret);
if (SPI_processed > 0) {
bool isnull;
partman_schema = DatumGetCString(SPI_getbinval(SPI_tuptable->vals[0]
, SPI_tuptable->tupdesc
, 1
, &isnull));
if (isnull)
elog(FATAL, "Query to determine pg_partman schema returned NULL.");
} else {
elog(FATAL, "Query to determine pg_partman schema returned zero rows.");
}
resetStringInfo(&buf);
if (strcmp(pg_partman_bgw_analyze, "on") == 0) {
analyze = "true";
} else {
analyze = "false";
}
if (strcmp(pg_partman_bgw_jobmon, "on") == 0) {
jobmon = "true";
} else {
jobmon = "false";
}
appendStringInfo(&buf, "SELECT %s.run_maintenance(p_analyze := %s, p_jobmon := %s)", partman_schema, analyze, jobmon);
pgstat_report_activity(STATE_RUNNING, buf.data);
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "Cannot call pg_partman run_maintenance() function: error code %d", ret);
elog(LOG, "%s: %s called by role %s on database %s"
, MyBgworkerEntry->bgw_name
, buf.data
, pg_partman_bgw_role
, pg_partman_bgw_database);
/*
* And finish our transaction.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
proc_exit(1);
}
/*
* Entrypoint of this module.
*
*/
void
_PG_init(void)
{
BackgroundWorker worker;
/* get the configuration */
DefineCustomIntVariable("pg_partman_bgw.interval",
"How often run_maintenance() is called (in seconds).",
NULL,
&pg_partman_bgw_interval,
3600,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("pg_partman_bgw.role",
"Role to be used by BGW. Must have execute permissions on run_maintenance()",
NULL,
&pg_partman_bgw_role,
"postgres",
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("pg_partman_bgw.database",
"Database to run BGW on. Currently only supports a single database in a cluster.",
NULL,
&pg_partman_bgw_database,
"postgres",
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("pg_partman_bgw.analyze",
"Whether to run an analyze on a partition set whenever a new partition is created during run_maintenance(). Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
NULL,
&pg_partman_bgw_analyze,
"on",
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("pg_partman_bgw.jobmon",
"Whether to log run_maintenance() calls to pg_jobmon if it is installed. Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.",
NULL,
&pg_partman_bgw_jobmon,
"on",
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (!process_shared_preload_libraries_in_progress)
return;
// Start BGW when database starts
sprintf(worker.bgw_name, "pg_partman background worker");
worker.bgw_main_arg = 1; // Not sure if this is needed. Docs say bgw_main must take a value?
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = pg_partman_bgw_main;
worker.bgw_notify_pid = 0;
RegisterBackgroundWorker(&worker);
}
/*
* Dynamically launch pg_partman BGW
*/
Datum
pg_partman_bgw_launch(PG_FUNCTION_ARGS)
{
// int32 i = PG_GETARG_INT32(0);
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = NULL;
sprintf(worker.bgw_library_name, "pg_partman_bgw");
sprintf(worker.bgw_function_name, "pg_partman_bgw_main");
sprintf(worker.bgw_name, "pg_partman background worker");
worker.bgw_main_arg = Int32GetDatum(1); // Not sure if this is needed. Any way to not send an arg?
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("Could not start pg_partman background process"),
errhint("More details may be available in the server log.")));
if (status == BGWH_POSTMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("Cannot start pg_partman background processes without postmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}
@ringerc
Copy link

ringerc commented Apr 10, 2015

// Not sure if this is needed. Any way to not send an arg?

If unused, you may leave it uninitialized, but it'd be better to just initialize it as you do there. If you want you can make it obvious that it's unused by passing

(Datum)0;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment