Skip to content

Instantly share code, notes, and snippets.

@ergo70
Created February 26, 2015 21:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ergo70/cb9d6675dabbf8b7a01a to your computer and use it in GitHub Desktop.
Save ergo70/cb9d6675dabbf8b7a01a to your computer and use it in GitHub Desktop.
worker_ltt
/* -------------------------------------------------------------------------
*
* worker_spi.c
* Sample background worker code that demonstrates various coding
* patterns: establishing a database connection; starting and committing
* transactions; using GUC variables, and heeding SIGHUP to reread
* the configuration file; reporting to pg_stat_activity; using the
* process latch to sleep and exit in case of postmaster death.
*
* This code connects to a database, creates a schema and table, and summarizes
* the numbers contained therein. To see it working, insert an initial value
* with "total" type and some initial value; then insert some other rows with
* "delta" type. Delta rows will be deleted by this worker and their values
* aggregated into the total.
*
* Copyright (C) 2013, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/worker_spi/worker_spi.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
/* These are always necessary for a bgworker */
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
/* these headers are used by this particular worker's code */
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_ltt_launch);
void _PG_init(void);
void worker_ltt_main(Datum);
/* flags set by signal handlers */
static volatile sig_atomic_t got_sighup = false;
static volatile sig_atomic_t got_sigterm = false;
typedef struct workerfunc
{
const char *schema;
const char *name;
} workerfunc;
/* GUC variables */
static int worker_ltt_naptime = 60;
static char* worker_ltt_db = NULL;
static char* worker_ltt_user = NULL;
static char* worker_ltt_schema = NULL;
static char* worker_ltt_funcname = NULL;
/*
* Signal handler for SIGTERM
* Set a flag to let the main loop to terminate, and set our latch to wake
* it up.
*/
static void
worker_ltt_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
/*
* Signal handler for SIGHUP
* Set a flag to tell the main loop to reread the config file, and set
* our latch to wake it up.
*/
static void
worker_ltt_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
/*
* Initialize workspace for a worker process: create the schema if it doesn't
* already exist.
*/
static void
initialize_worker_ltt(workerfunc *func)
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, "initializing ltt_worker");
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "SELECT count(*) FROM pg_proc p INNER JOIN pg_namespace n ON (p.pronamespace = n.oid) WHERE n.nspname='%s' AND p.proname='%s'",
func->schema, func->name);
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull || (ntup != 1))
{
elog(FATAL, "Function %s.%s() not found!",func->schema, func->name);
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
void
worker_ltt_main(Datum main_arg)
{
//int index = DatumGetInt32(main_arg);
StringInfoData buf;
workerfunc *func;
func = palloc(sizeof(workerfunc));
func->schema = pstrdup(worker_ltt_schema);
func->name = pstrdup(worker_ltt_funcname);
//elog(LOG, "2: %s.%s %s.%s",func->schema, func->name,worker_ltt_schema, worker_ltt_funcname);
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, worker_ltt_sighup);
pqsignal(SIGTERM, worker_ltt_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
BackgroundWorkerInitializeConnection(worker_ltt_db, worker_ltt_user);
elog(LOG, "%s initialized with %s.%s()",
MyBgworkerEntry->bgw_name, worker_ltt_schema, worker_ltt_funcname);
initialize_worker_ltt(func);
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_ltt, because that routine assumes the names are not
* quoted.
*
* Note some memory might be leaked here.
*/
func->schema = quote_identifier(func->schema);
func->name = quote_identifier(func->name);
//elog(LOG, "3: %s.%s %s.%s",func->schema, func->name,worker_ltt_schema, worker_ltt_funcname);
initStringInfo(&buf);
appendStringInfo(&buf,
"SELECT * FROM %s.%s()",
func->schema, func->name);
/*
* Main loop: do this until the SIGTERM handler tells us to terminate
*/
while (!got_sigterm)
{
int ret;
int rc;
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
worker_ltt_naptime * 1000L);
ResetLatch(&MyProc->procLatch);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
/*
* In case of a SIGHUP, just reload the configuration.
*/
if (got_sighup)
{
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Start a transaction on which we can run queries. Note that each
* StartTransactionCommand() call should be preceded by a
* SetCurrentStatementStartTimestamp() call, which sets both the time
* for the statement we're about the run, and also the transaction
* start time. Also, each other query sent to SPI should probably be
* preceded by SetCurrentStatementStartTimestamp(), so that statement
* start time is always up to date.
*
* The SPI_connect() call lets us run queries through the SPI manager,
* and the PushActiveSnapshot() call creates an "active" snapshot
* which is necessary for queries to have MVCC data to work on.
*
* The pgstat_report_activity() call makes our activity visible
* through the pgstat views.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, buf.data);
/* We can now execute queries via SPI */
ret = SPI_execute(buf.data, true, 1);
if (ret != SPI_OK_SELECT)
elog(FATAL, "cannot execute %s: error code %d",
buf.data, ret);
if (SPI_processed > 0)
{
bool isnull;
bool val;
val = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (!isnull)
elog(LOG, "%s: %s.%s returned %s",
MyBgworkerEntry->bgw_name,
func->schema, func->name, val ? "true" : "false");
}
/*
* And finish our transaction.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
proc_exit(1);
}
/*
* Entrypoint of this module.
*
* We register more than one worker process here, to demonstrate how that can
* be done.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
/* get the configuration */
DefineCustomIntVariable("worker_ltt.naptime",
"Duration between each check (in seconds).",
NULL,
&worker_ltt_naptime,
60,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("worker_ltt.database",
"Database.",
NULL,
&worker_ltt_db,
NULL,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("worker_ltt.user",
"User.",
NULL,
&worker_ltt_user,
NULL,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("worker_ltt.schema",
"Schema.",
NULL,
&worker_ltt_schema,
NULL,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomStringVariable("worker_ltt.function",
"Function.",
NULL,
&worker_ltt_funcname,
NULL,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
//elog(LOG, "1: %s.%s %s.%s",schema, funcname,worker_ltt_schema, worker_ltt_funcname);
if (!process_shared_preload_libraries_in_progress)
return;
/* set up common data for all our workers */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = worker_ltt_main;
worker.bgw_notify_pid = 0;
/*
* Now fill in worker-specific data, and do the actual registrations.
*/
worker.bgw_main_arg = Int32GetDatum(1);
RegisterBackgroundWorker(&worker);
}
/*
* Dynamically launch an SPI worker.
*/
Datum
worker_ltt_launch(PG_FUNCTION_ARGS)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = NULL; /* new worker might not have library loaded */
sprintf(worker.bgw_library_name, "worker_ltt");
sprintf(worker.bgw_function_name, "worker_ltt_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "worker ltt");
worker.bgw_main_arg = Int32GetDatum(1);
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start background process"),
errhint("More details may be available in the server log.")));
if (status == BGWH_POSTMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("cannot start background processes without postmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment