Created
February 26, 2015 21:43
-
-
Save ergo70/cb9d6675dabbf8b7a01a to your computer and use it in GitHub Desktop.
worker_ltt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* ------------------------------------------------------------------------- | |
* | |
* worker_spi.c | |
* Sample background worker code that demonstrates various coding | |
* patterns: establishing a database connection; starting and committing | |
* transactions; using GUC variables, and heeding SIGHUP to reread | |
* the configuration file; reporting to pg_stat_activity; using the | |
* process latch to sleep and exit in case of postmaster death. | |
* | |
* This code connects to a database, creates a schema and table, and summarizes | |
* the numbers contained therein. To see it working, insert an initial value | |
* with "total" type and some initial value; then insert some other rows with | |
* "delta" type. Delta rows will be deleted by this worker and their values | |
* aggregated into the total. | |
* | |
* Copyright (C) 2013, PostgreSQL Global Development Group | |
* | |
* IDENTIFICATION | |
* contrib/worker_spi/worker_spi.c | |
* | |
* ------------------------------------------------------------------------- | |
*/ | |
#include "postgres.h" | |
/* These are always necessary for a bgworker */ | |
#include "miscadmin.h" | |
#include "postmaster/bgworker.h" | |
#include "storage/ipc.h" | |
#include "storage/latch.h" | |
#include "storage/lwlock.h" | |
#include "storage/proc.h" | |
#include "storage/shmem.h" | |
/* these headers are used by this particular worker's code */ | |
#include "access/xact.h" | |
#include "executor/spi.h" | |
#include "fmgr.h" | |
#include "lib/stringinfo.h" | |
#include "pgstat.h" | |
#include "utils/builtins.h" | |
#include "utils/snapmgr.h" | |
#include "tcop/utility.h" | |
PG_MODULE_MAGIC; | |
PG_FUNCTION_INFO_V1(worker_ltt_launch); | |
void _PG_init(void); | |
void worker_ltt_main(Datum); | |
/* flags set by signal handlers */ | |
static volatile sig_atomic_t got_sighup = false; | |
static volatile sig_atomic_t got_sigterm = false; | |
typedef struct workerfunc | |
{ | |
const char *schema; | |
const char *name; | |
} workerfunc; | |
/* GUC variables */ | |
static int worker_ltt_naptime = 60; | |
static char* worker_ltt_db = NULL; | |
static char* worker_ltt_user = NULL; | |
static char* worker_ltt_schema = NULL; | |
static char* worker_ltt_funcname = NULL; | |
/* | |
* Signal handler for SIGTERM | |
* Set a flag to let the main loop to terminate, and set our latch to wake | |
* it up. | |
*/ | |
static void | |
worker_ltt_sigterm(SIGNAL_ARGS) | |
{ | |
int save_errno = errno; | |
got_sigterm = true; | |
if (MyProc) | |
SetLatch(&MyProc->procLatch); | |
errno = save_errno; | |
} | |
/* | |
* Signal handler for SIGHUP | |
* Set a flag to tell the main loop to reread the config file, and set | |
* our latch to wake it up. | |
*/ | |
static void | |
worker_ltt_sighup(SIGNAL_ARGS) | |
{ | |
int save_errno = errno; | |
got_sighup = true; | |
if (MyProc) | |
SetLatch(&MyProc->procLatch); | |
errno = save_errno; | |
} | |
/* | |
* Initialize workspace for a worker process: create the schema if it doesn't | |
* already exist. | |
*/ | |
static void | |
initialize_worker_ltt(workerfunc *func) | |
{ | |
int ret; | |
int ntup; | |
bool isnull; | |
StringInfoData buf; | |
SetCurrentStatementStartTimestamp(); | |
StartTransactionCommand(); | |
SPI_connect(); | |
PushActiveSnapshot(GetTransactionSnapshot()); | |
pgstat_report_activity(STATE_RUNNING, "initializing ltt_worker"); | |
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */ | |
initStringInfo(&buf); | |
appendStringInfo(&buf, "SELECT count(*) FROM pg_proc p INNER JOIN pg_namespace n ON (p.pronamespace = n.oid) WHERE n.nspname='%s' AND p.proname='%s'", | |
func->schema, func->name); | |
ret = SPI_execute(buf.data, true, 0); | |
if (ret != SPI_OK_SELECT) | |
elog(FATAL, "SPI_execute failed: error code %d", ret); | |
if (SPI_processed != 1) | |
elog(FATAL, "not a singleton result"); | |
ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], | |
SPI_tuptable->tupdesc, | |
1, &isnull)); | |
if (isnull || (ntup != 1)) | |
{ | |
elog(FATAL, "Function %s.%s() not found!",func->schema, func->name); | |
} | |
SPI_finish(); | |
PopActiveSnapshot(); | |
CommitTransactionCommand(); | |
pgstat_report_activity(STATE_IDLE, NULL); | |
} | |
void | |
worker_ltt_main(Datum main_arg) | |
{ | |
//int index = DatumGetInt32(main_arg); | |
StringInfoData buf; | |
workerfunc *func; | |
func = palloc(sizeof(workerfunc)); | |
func->schema = pstrdup(worker_ltt_schema); | |
func->name = pstrdup(worker_ltt_funcname); | |
//elog(LOG, "2: %s.%s %s.%s",func->schema, func->name,worker_ltt_schema, worker_ltt_funcname); | |
/* Establish signal handlers before unblocking signals. */ | |
pqsignal(SIGHUP, worker_ltt_sighup); | |
pqsignal(SIGTERM, worker_ltt_sigterm); | |
/* We're now ready to receive signals */ | |
BackgroundWorkerUnblockSignals(); | |
/* Connect to our database */ | |
BackgroundWorkerInitializeConnection(worker_ltt_db, worker_ltt_user); | |
elog(LOG, "%s initialized with %s.%s()", | |
MyBgworkerEntry->bgw_name, worker_ltt_schema, worker_ltt_funcname); | |
initialize_worker_ltt(func); | |
/* | |
* Quote identifiers passed to us. Note that this must be done after | |
* initialize_worker_ltt, because that routine assumes the names are not | |
* quoted. | |
* | |
* Note some memory might be leaked here. | |
*/ | |
func->schema = quote_identifier(func->schema); | |
func->name = quote_identifier(func->name); | |
//elog(LOG, "3: %s.%s %s.%s",func->schema, func->name,worker_ltt_schema, worker_ltt_funcname); | |
initStringInfo(&buf); | |
appendStringInfo(&buf, | |
"SELECT * FROM %s.%s()", | |
func->schema, func->name); | |
/* | |
* Main loop: do this until the SIGTERM handler tells us to terminate | |
*/ | |
while (!got_sigterm) | |
{ | |
int ret; | |
int rc; | |
/* | |
* Background workers mustn't call usleep() or any direct equivalent: | |
* instead, they may wait on their process latch, which sleeps as | |
* necessary, but is awakened if postmaster dies. That way the | |
* background process goes away immediately in an emergency. | |
*/ | |
rc = WaitLatch(&MyProc->procLatch, | |
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, | |
worker_ltt_naptime * 1000L); | |
ResetLatch(&MyProc->procLatch); | |
/* emergency bailout if postmaster has died */ | |
if (rc & WL_POSTMASTER_DEATH) | |
proc_exit(1); | |
/* | |
* In case of a SIGHUP, just reload the configuration. | |
*/ | |
if (got_sighup) | |
{ | |
got_sighup = false; | |
ProcessConfigFile(PGC_SIGHUP); | |
} | |
/* | |
* Start a transaction on which we can run queries. Note that each | |
* StartTransactionCommand() call should be preceded by a | |
* SetCurrentStatementStartTimestamp() call, which sets both the time | |
* for the statement we're about the run, and also the transaction | |
* start time. Also, each other query sent to SPI should probably be | |
* preceded by SetCurrentStatementStartTimestamp(), so that statement | |
* start time is always up to date. | |
* | |
* The SPI_connect() call lets us run queries through the SPI manager, | |
* and the PushActiveSnapshot() call creates an "active" snapshot | |
* which is necessary for queries to have MVCC data to work on. | |
* | |
* The pgstat_report_activity() call makes our activity visible | |
* through the pgstat views. | |
*/ | |
SetCurrentStatementStartTimestamp(); | |
StartTransactionCommand(); | |
SPI_connect(); | |
PushActiveSnapshot(GetTransactionSnapshot()); | |
pgstat_report_activity(STATE_RUNNING, buf.data); | |
/* We can now execute queries via SPI */ | |
ret = SPI_execute(buf.data, true, 1); | |
if (ret != SPI_OK_SELECT) | |
elog(FATAL, "cannot execute %s: error code %d", | |
buf.data, ret); | |
if (SPI_processed > 0) | |
{ | |
bool isnull; | |
bool val; | |
val = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0], | |
SPI_tuptable->tupdesc, | |
1, &isnull)); | |
if (!isnull) | |
elog(LOG, "%s: %s.%s returned %s", | |
MyBgworkerEntry->bgw_name, | |
func->schema, func->name, val ? "true" : "false"); | |
} | |
/* | |
* And finish our transaction. | |
*/ | |
SPI_finish(); | |
PopActiveSnapshot(); | |
CommitTransactionCommand(); | |
pgstat_report_activity(STATE_IDLE, NULL); | |
} | |
proc_exit(1); | |
} | |
/* | |
* Entrypoint of this module. | |
* | |
* We register more than one worker process here, to demonstrate how that can | |
* be done. | |
*/ | |
void | |
_PG_init(void) | |
{ | |
BackgroundWorker worker; | |
/* get the configuration */ | |
DefineCustomIntVariable("worker_ltt.naptime", | |
"Duration between each check (in seconds).", | |
NULL, | |
&worker_ltt_naptime, | |
60, | |
1, | |
INT_MAX, | |
PGC_SIGHUP, | |
0, | |
NULL, | |
NULL, | |
NULL); | |
DefineCustomStringVariable("worker_ltt.database", | |
"Database.", | |
NULL, | |
&worker_ltt_db, | |
NULL, | |
PGC_SIGHUP, | |
0, | |
NULL, | |
NULL, | |
NULL); | |
DefineCustomStringVariable("worker_ltt.user", | |
"User.", | |
NULL, | |
&worker_ltt_user, | |
NULL, | |
PGC_SIGHUP, | |
0, | |
NULL, | |
NULL, | |
NULL); | |
DefineCustomStringVariable("worker_ltt.schema", | |
"Schema.", | |
NULL, | |
&worker_ltt_schema, | |
NULL, | |
PGC_SIGHUP, | |
0, | |
NULL, | |
NULL, | |
NULL); | |
DefineCustomStringVariable("worker_ltt.function", | |
"Function.", | |
NULL, | |
&worker_ltt_funcname, | |
NULL, | |
PGC_SIGHUP, | |
0, | |
NULL, | |
NULL, | |
NULL); | |
//elog(LOG, "1: %s.%s %s.%s",schema, funcname,worker_ltt_schema, worker_ltt_funcname); | |
if (!process_shared_preload_libraries_in_progress) | |
return; | |
/* set up common data for all our workers */ | |
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | | |
BGWORKER_BACKEND_DATABASE_CONNECTION; | |
worker.bgw_start_time = BgWorkerStart_RecoveryFinished; | |
worker.bgw_restart_time = BGW_NEVER_RESTART; | |
worker.bgw_main = worker_ltt_main; | |
worker.bgw_notify_pid = 0; | |
/* | |
* Now fill in worker-specific data, and do the actual registrations. | |
*/ | |
worker.bgw_main_arg = Int32GetDatum(1); | |
RegisterBackgroundWorker(&worker); | |
} | |
/* | |
* Dynamically launch an SPI worker. | |
*/ | |
Datum | |
worker_ltt_launch(PG_FUNCTION_ARGS) | |
{ | |
BackgroundWorker worker; | |
BackgroundWorkerHandle *handle; | |
BgwHandleStatus status; | |
pid_t pid; | |
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | | |
BGWORKER_BACKEND_DATABASE_CONNECTION; | |
worker.bgw_start_time = BgWorkerStart_RecoveryFinished; | |
worker.bgw_restart_time = BGW_NEVER_RESTART; | |
worker.bgw_main = NULL; /* new worker might not have library loaded */ | |
sprintf(worker.bgw_library_name, "worker_ltt"); | |
sprintf(worker.bgw_function_name, "worker_ltt_main"); | |
snprintf(worker.bgw_name, BGW_MAXLEN, "worker ltt"); | |
worker.bgw_main_arg = Int32GetDatum(1); | |
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */ | |
worker.bgw_notify_pid = MyProcPid; | |
if (!RegisterDynamicBackgroundWorker(&worker, &handle)) | |
PG_RETURN_NULL(); | |
status = WaitForBackgroundWorkerStartup(handle, &pid); | |
if (status == BGWH_STOPPED) | |
ereport(ERROR, | |
(errcode(ERRCODE_INSUFFICIENT_RESOURCES), | |
errmsg("could not start background process"), | |
errhint("More details may be available in the server log."))); | |
if (status == BGWH_POSTMASTER_DIED) | |
ereport(ERROR, | |
(errcode(ERRCODE_INSUFFICIENT_RESOURCES), | |
errmsg("cannot start background processes without postmaster"), | |
errhint("Kill all remaining database processes and restart the database."))); | |
Assert(status == BGWH_STARTED); | |
PG_RETURN_INT32(pid); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment