Skip to content

Instantly share code, notes, and snippets.

@ololobus
Last active August 25, 2017 15:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ololobus/dea1ac37dc74ea9475481b6e0a5694cd to your computer and use it in GitHub Desktop.
Save ololobus/dea1ac37dc74ea9475481b6e0a5694cd to your computer and use it in GitHub Desktop.
From 22045780b95db74a99dc6e13e57413401da92c81 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Mon, 10 Jul 2017 17:48:26 +0300
Subject: [PATCH 01/13] Dummy COPY FROM BGWorker v0.1
---
src/backend/commands/copy.c | 42 +++++++++++++++++++++++++++++++++++++++
src/backend/postmaster/bgworker.c | 4 ++++
src/include/commands/copy.h | 1 +
3 files changed, 47 insertions(+)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 29c5376b2d..964469d01e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -38,8 +38,10 @@
#include "optimizer/planner.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
+#include "postmaster/bgworker.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
+#include "storage/lwlock.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -2336,6 +2338,20 @@ limit_printout_length(const char *str)
}
/*
+ * Copy FROM Background Worker main loop.
+ */
+void
+CopyFromBgwMainLoop(Datum main_arg)
+{
+ int i = DatumGetInt32(main_arg);
+ int j;
+ j = i + 1;
+
+ elog(LOG, "BGWorker process: Result: %d", j);
+}
+
+
+/*
* Copy FROM file to relation.
*/
uint64
@@ -2367,6 +2383,32 @@ CopyFrom(CopyState cstate)
Size bufferedTuplesSize = 0;
int firstBufferedLineNo = 0;
+ // BG Worker setup start
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *bgwhandle = NULL;
+ BgwHandleStatus bgwstatus;
+ pid_t bgwpid;
+
+ int bgwarg = 3;
+
+ MemSet(&worker, 0, sizeof(BackgroundWorker));
+
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_notify_pid = MyProcPid;
+ sprintf(worker.bgw_library_name, "postgres");
+ sprintf(worker.bgw_function_name, "CopyFromBgwMainLoop");
+
+ snprintf(worker.bgw_name, BGW_MAXLEN, "copy_from_bgw_pool_worker_%d", 1);
+ worker.bgw_main_arg = Int32GetDatum(bgwarg);
+ RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
+ // BG Worker setup end
+
+ bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
+ elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d).", MyProcPid, bgwpid);
+
Assert(cstate->rel);
/*
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 28af6f0f07..b41db252ce 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -16,6 +16,7 @@
#include "libpq/pqsignal.h"
#include "access/parallel.h"
+#include "commands/copy.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
@@ -129,6 +130,9 @@ static const struct
},
{
"ApplyWorkerMain", ApplyWorkerMain
+ },
+ {
+ "CopyFromBgwMainLoop", CopyFromBgwMainLoop
}
};
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index bba64fc3d7..0b690b39ca 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -37,6 +37,7 @@ extern bool NextCopyFromRawFields(CopyState cstate,
char ***fields, int *nfields);
extern void CopyFromErrorCallback(void *arg);
+extern void CopyFromBgwMainLoop(Datum main_arg);
extern uint64 CopyFrom(CopyState cstate);
extern DestReceiver *CreateCopyDestReceiver(void);
--
2.11.0
From 7289acd9a367e134c96ed66b1b3ac0c88e22e3ad Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Fri, 14 Jul 2017 15:08:31 +0300
Subject: [PATCH 02/13] Do not wait for worker startup to prevent segfault
during the initdb
---
src/backend/commands/copy.c | 53 ++++++++++++++++++++++++---------------------
1 file changed, 28 insertions(+), 25 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 964469d01e..81af534d4d 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2383,31 +2383,34 @@ CopyFrom(CopyState cstate)
Size bufferedTuplesSize = 0;
int firstBufferedLineNo = 0;
- // BG Worker setup start
- BackgroundWorker worker;
- BackgroundWorkerHandle *bgwhandle = NULL;
- BgwHandleStatus bgwstatus;
- pid_t bgwpid;
-
- int bgwarg = 3;
-
- MemSet(&worker, 0, sizeof(BackgroundWorker));
-
- worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
- BGWORKER_BACKEND_DATABASE_CONNECTION;
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
- worker.bgw_restart_time = BGW_NEVER_RESTART;
- worker.bgw_notify_pid = MyProcPid;
- sprintf(worker.bgw_library_name, "postgres");
- sprintf(worker.bgw_function_name, "CopyFromBgwMainLoop");
-
- snprintf(worker.bgw_name, BGW_MAXLEN, "copy_from_bgw_pool_worker_%d", 1);
- worker.bgw_main_arg = Int32GetDatum(bgwarg);
- RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
- // BG Worker setup end
-
- bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
- elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d).", MyProcPid, bgwpid);
+ if (!IsBootstrapProcessingMode())
+ {
+ // BG Worker setup start
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *bgwhandle = NULL;
+ BgwHandleStatus bgwstatus;
+ pid_t bgwpid;
+
+ int bgwarg = 3;
+
+ MemSet(&worker, 0, sizeof(BackgroundWorker));
+
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_notify_pid = MyProcPid;
+ sprintf(worker.bgw_library_name, "postgres");
+ sprintf(worker.bgw_function_name, "CopyFromBgwMainLoop");
+
+ snprintf(worker.bgw_name, BGW_MAXLEN, "copy_from_bgw_pool_worker_%d", 1);
+ worker.bgw_main_arg = Int32GetDatum(bgwarg);
+ RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
+ // BG Worker setup end
+
+ // bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
+ elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d).", MyProcPid, bgwpid);
+ }
Assert(cstate->rel);
--
2.11.0
From d04ec3310a343436ee95ad02ff3b2ede66547193 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Fri, 14 Jul 2017 16:57:57 +0300
Subject: [PATCH 03/13] Check that IsUnderPostmaster before BGWorkers setup
---
src/backend/commands/copy.c | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 81af534d4d..9fb2c0bfee 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -224,6 +224,8 @@ typedef struct CopyStateData
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
+
+ bool allow_parallel;
} CopyStateData;
/* DestReceiver for COPY (query) TO */
@@ -1409,6 +1411,8 @@ BeginCopy(ParseState *pstate,
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
+ cstate->allow_parallel = IsNormalProcessingMode() && IsUnderPostmaster;
+
/*
* We allocate everything used by a cstate in a new memory context. This
* avoids memory leaks during repeated use of COPY in a query.
@@ -2383,7 +2387,7 @@ CopyFrom(CopyState cstate)
Size bufferedTuplesSize = 0;
int firstBufferedLineNo = 0;
- if (!IsBootstrapProcessingMode())
+ if (cstate->allow_parallel)
{
// BG Worker setup start
BackgroundWorker worker;
@@ -2408,7 +2412,7 @@ CopyFrom(CopyState cstate)
RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
// BG Worker setup end
- // bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
+ bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d).", MyProcPid, bgwpid);
}
--
2.11.0
From cdb3846ecf54c04298af937136625c0e1ceebf4a Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Fri, 21 Jul 2017 15:25:02 +0300
Subject: [PATCH 04/13] Very very dummy workers v0.2, but with a proper shared
memory usage
---
src/backend/commands/copy.c | 1155 ++++++++++++++++++++++++++++--
src/backend/storage/lmgr/lwlock.c | 3 +-
src/backend/storage/lmgr/lwlocknames.txt | 1 +
src/include/commands/copy.h | 2 +
src/include/storage/lwlock.h | 1 +
5 files changed, 1117 insertions(+), 45 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9fb2c0bfee..5a087f33c7 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -36,18 +36,29 @@
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
+#include "optimizer/cost.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
+#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/dsm.h"
#include "storage/fd.h"
#include "storage/lwlock.h"
+#include "storage/spin.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
+#include "utils/resowner.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
@@ -236,6 +247,51 @@ typedef struct
uint64 processed; /* # of tuples processed */
} DR_copy;
+typedef struct CopyFromStateData
+{
+ CopyState cstate;
+ HeapTuple tuple;
+ TupleDesc tupDesc;
+ Datum *values;
+ bool *nulls;
+ ResultRelInfo *resultRelInfo;
+ ResultRelInfo *saved_resultRelInfo;
+ EState *estate;
+ ExprContext *econtext;
+ TupleTableSlot *myslot;
+ MemoryContext oldcontext;
+
+ ErrorContextCallback errcallback;
+ CommandId mycid;
+ int hi_options;
+ BulkInsertState bistate;
+ uint64 processed;
+ bool useHeapMultiInsert;
+ int nBufferedTuples;
+ int prev_leaf_part_index;
+
+#define MAX_BUFFERED_TUPLES 1000
+ HeapTuple *bufferedTuples;
+ Size bufferedTuplesSize;
+ int firstBufferedLineNo;
+} CopyFromStateData;
+
+typedef struct
+{
+ int nworkers;
+ BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
+} WorkerState;
+
+typedef struct
+{
+ slock_t mutex;
+ int curr_line;
+ int workers_total;
+ int workers_attached;
+ int workers_ready;
+} ParallelState;
+
+#define PG_COPY_FROM_SHM_MQ_MAGIC 0x79fb2447
/*
* These macros centralize code used to process line_buf and raw_buf buffers.
@@ -302,6 +358,7 @@ if (1) \
goto not_end_of_copy; \
} else ((void) 0)
+
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
@@ -356,6 +413,23 @@ static void CopySendInt16(CopyState cstate, int16 val);
static bool CopyGetInt16(CopyState cstate, int16 *val);
+static ParallelState* shm_mq_setup(int64 queue_size, int32 nworkers,
+ dsm_segment **segp, shm_mq_handle **output,
+ shm_mq_handle **input);
+static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
+ dsm_segment **segp,
+ ParallelState **hdrp,
+ shm_mq **outp, shm_mq **inp);
+static WorkerState *setup_background_workers(int nworkers,
+ dsm_segment *seg);
+static void cleanup_background_workers(dsm_segment *seg, Datum arg);
+static void wait_for_workers_to_become_ready(WorkerState *wstate,
+ volatile ParallelState *pst);
+static bool check_worker_status(WorkerState *wstate);
+
+static void handle_sigterm(SIGNAL_ARGS);
+
+
/*
* Send copy start/stop messages for frontend copies. These have changed
* in past protocol redesigns.
@@ -989,7 +1063,16 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
NULL, stmt->attlist, stmt->options);
- *processed = CopyFrom(cstate); /* copy from file to database */
+
+ if (cstate->allow_parallel) /* copy from file to database */
+ {
+ *processed = ParallelCopyFrom(cstate);
+ }
+ else
+ {
+ *processed = CopyFrom(cstate);
+ }
+
EndCopyFrom(cstate);
}
else
@@ -2342,20 +2425,6 @@ limit_printout_length(const char *str)
}
/*
- * Copy FROM Background Worker main loop.
- */
-void
-CopyFromBgwMainLoop(Datum main_arg)
-{
- int i = DatumGetInt32(main_arg);
- int j;
- j = i + 1;
-
- elog(LOG, "BGWorker process: Result: %d", j);
-}
-
-
-/*
* Copy FROM file to relation.
*/
uint64
@@ -2387,35 +2456,6 @@ CopyFrom(CopyState cstate)
Size bufferedTuplesSize = 0;
int firstBufferedLineNo = 0;
- if (cstate->allow_parallel)
- {
- // BG Worker setup start
- BackgroundWorker worker;
- BackgroundWorkerHandle *bgwhandle = NULL;
- BgwHandleStatus bgwstatus;
- pid_t bgwpid;
-
- int bgwarg = 3;
-
- MemSet(&worker, 0, sizeof(BackgroundWorker));
-
- worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
- BGWORKER_BACKEND_DATABASE_CONNECTION;
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
- worker.bgw_restart_time = BGW_NEVER_RESTART;
- worker.bgw_notify_pid = MyProcPid;
- sprintf(worker.bgw_library_name, "postgres");
- sprintf(worker.bgw_function_name, "CopyFromBgwMainLoop");
-
- snprintf(worker.bgw_name, BGW_MAXLEN, "copy_from_bgw_pool_worker_%d", 1);
- worker.bgw_main_arg = Int32GetDatum(bgwarg);
- RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
- // BG Worker setup end
-
- bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
- elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d).", MyProcPid, bgwpid);
- }
-
Assert(cstate->rel);
/*
@@ -4976,3 +5016,1030 @@ CreateCopyDestReceiver(void)
return (DestReceiver *) self;
}
+
+
+
+/*
+ * Copy FROM Background Worker main loop.
+ */
+void
+CopyFromBgwMainLoop(Datum main_arg)
+{
+ volatile ParallelState *hdr;
+ dsm_segment *seg;
+ shm_toc *toc;
+ shm_mq_handle *inqh;
+ shm_mq_handle *outqh;
+ int myworkernumber;
+ PGPROC *registrant;
+ int prev = 0;
+
+ /*
+ * Establish signal handlers.
+ *
+ * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
+ * it would a normal user backend. To make that happen, we establish a
+ * signal handler that is a stripped-down version of die().
+ */
+ pqsignal(SIGTERM, handle_sigterm);
+ BackgroundWorkerUnblockSignals();
+
+ /*
+ * Connect to the dynamic shared memory segment.
+ *
+ * The backend that registered this worker passed us the ID of a shared
+ * memory segment to which we must attach for further instructions. In
+ * order to attach to dynamic shared memory, we need a resource owner.
+ * Once we've mapped the segment in our address space, attach to the table
+ * of contents so we can locate the various data structures we'll need to
+ * find within the segment.
+ */
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker");
+ seg = dsm_attach(DatumGetInt32(main_arg));
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to map dynamic shared memory segment")));
+ toc = shm_toc_attach(PG_COPY_FROM_SHM_MQ_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bad magic number in dynamic shared memory segment")));
+
+ /*
+ * Acquire a worker number.
+ *
+ * By convention, the process registering this background worker should
+ * have stored the control structure at key 0. We look up that key to
+ * find it. Our worker number gives our identity: there may be just one
+ * worker involved in this parallel operation, or there may be many.
+ */
+ hdr = shm_toc_lookup(toc, 0, false);
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // SpinLockAcquire(&pst->mutex);
+ myworkernumber = ++hdr->workers_attached;
+ // SpinLockRelease(&pst->mutex);
+ LWLockRelease(CopyFromBgwLock);
+ if (myworkernumber > hdr->workers_total)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("too many message queue testing workers already")));
+
+ /*
+ * Attach to the appropriate message queues.
+ */
+ // attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh);
+
+ /*
+ * Indicate that we're fully initialized and ready to begin the main part
+ * of the parallel operation.
+ *
+ * Once we signal that we're ready, the user backend is entitled to assume
+ * that our on_dsm_detach callbacks will fire before we disconnect from
+ * the shared memory segment and exit. Generally, that means we must have
+ * attached to all relevant dynamic shared memory data structures by now.
+ */
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // SpinLockAcquire(&pst->mutex);
+ ++hdr->workers_ready;
+ elog(LOG, "BGWorker #%d started with curr_line=%d", myworkernumber, hdr->curr_line);
+ // SpinLockRelease(&pst->mutex);
+ LWLockRelease(CopyFromBgwLock);
+ registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+ if (registrant == NULL)
+ {
+ elog(DEBUG1, "registrant backend has exited prematurely");
+ proc_exit(1);
+ }
+ SetLatch(&registrant->procLatch);
+
+ /* Do the work. */
+ // copy_messages(inqh, outqh);
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // SpinLockAcquire(&hdr->mutex);
+ if (hdr->curr_line != prev)
+ {
+ elog(LOG, "BGWorker #%d dummy processing line #%d", myworkernumber, hdr->curr_line);
+ }
+ prev = hdr->curr_line;
+ // SpinLockRelease(&hdr->mutex);
+ LWLockRelease(CopyFromBgwLock);
+ }
+
+ /*
+ * We're done. Explicitly detach the shared memory segment so that we
+ * don't get a resource leak warning at commit time. This will fire any
+ * on_dsm_detach callbacks we've registered, as well. Once that's done,
+ * we can go ahead and exit.
+ */
+ dsm_detach(seg);
+ // proc_exit(1);
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ SetLatch(MyLatch);
+
+ if (!proc_exit_inprogress)
+ {
+ InterruptPending = true;
+ ProcDiePending = true;
+ }
+
+ errno = save_errno;
+}
+
+/*
+ * Set up a dynamic shared memory segment and zero or more background workers
+ * for a test run.
+ */
+static ParallelState*
+shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
+ shm_mq_handle **output, shm_mq_handle **input)
+{
+ dsm_segment *seg;
+ ParallelState *hdr;
+ shm_mq *outq = NULL; /* placate compiler */
+ shm_mq *inq = NULL; /* placate compiler */
+ WorkerState *wstate;
+
+ /* Set up a dynamic shared memory segment. */
+ setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
+ *segp = seg;
+
+ /* Register background workers. */
+ wstate = setup_background_workers(nworkers, seg);
+
+ /* Attach the queues. */
+ *output = shm_mq_attach(outq, seg, wstate->handle[0]);
+ *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
+
+ /* Wait for workers to become ready. */
+ wait_for_workers_to_become_ready(wstate, hdr);
+
+ /*
+ * Once we reach this point, all workers are ready. We no longer need to
+ * kill them if we die; they'll die on their own as the message queues
+ * shut down.
+ */
+ // cancel_on_dsm_detach(seg, cleanup_background_workers,
+ // PointerGetDatum(wstate));
+ // pfree(wstate);
+
+ return hdr;
+}
+
+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a small control region that contains only a test_shm_mq_header,
+ * plus one region per message queue. There are as many message queues as
+ * the number of workers, plus one.
+ */
+static void
+setup_dynamic_shared_memory(int64 queue_size, int nworkers,
+ dsm_segment **segp, ParallelState **hdrp,
+ shm_mq **outp, shm_mq **inp)
+{
+ shm_toc_estimator e;
+ int i;
+ Size segsize;
+ dsm_segment *seg;
+ shm_toc *toc;
+ ParallelState *hdr;
+
+ /* Ensure a valid queue size. */
+ if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("queue size must be at least %zu bytes",
+ shm_mq_minimum_size)));
+ if (queue_size != ((Size) queue_size))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("queue size overflows size_t")));
+
+ /*
+ * Estimate how much shared memory we need.
+ *
+ * Because the TOC machinery may choose to insert padding of oddly-sized
+ * requests, we must estimate each chunk separately.
+ *
+ * We need one key to register the location of the header, and we need
+ * nworkers + 1 keys to track the locations of the message queues.
+ */
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(ParallelState));
+ for (i = 0; i <= nworkers; ++i)
+ shm_toc_estimate_chunk(&e, (Size) queue_size);
+ shm_toc_estimate_keys(&e, 2 + nworkers);
+ segsize = shm_toc_estimate(&e);
+
+ /* Create the shared memory segment and establish a table of contents. */
+ seg = dsm_create(shm_toc_estimate(&e), 0);
+ toc = shm_toc_create(PG_COPY_FROM_SHM_MQ_MAGIC, dsm_segment_address(seg),
+ segsize);
+
+ /* Set up the header region. */
+ hdr = shm_toc_allocate(toc, sizeof(ParallelState));
+ SpinLockInit(&hdr->mutex);
+ hdr->workers_total = nworkers;
+ hdr->workers_attached = 0;
+ hdr->workers_ready = 0;
+ shm_toc_insert(toc, 0, hdr);
+
+ /* Set up one message queue per worker, plus one. */
+ for (i = 0; i <= nworkers; ++i)
+ {
+ shm_mq *mq;
+
+ mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
+ (Size) queue_size);
+ shm_toc_insert(toc, i + 1, mq);
+
+ if (i == 0)
+ {
+ /* We send messages to the first queue. */
+ shm_mq_set_sender(mq, MyProc);
+ *outp = mq;
+ }
+ if (i == nworkers)
+ {
+ /* We receive messages from the last queue. */
+ shm_mq_set_receiver(mq, MyProc);
+ *inp = mq;
+ }
+ }
+
+ /* Return results to caller. */
+ *segp = seg;
+ *hdrp = hdr;
+}
+
+/*
+ * Register background workers.
+ */
+static WorkerState *
+setup_background_workers(int nworkers, dsm_segment *seg)
+{
+ MemoryContext oldcontext;
+ BackgroundWorker worker;
+ WorkerState *wstate;
+ int i;
+
+ /*
+ * We need the worker_state object and the background worker handles to
+ * which it points to be allocated in CurTransactionContext rather than
+ * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
+ * hooks run.
+ */
+ oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+
+ /* Create worker state object. */
+ wstate = MemoryContextAlloc(TopTransactionContext,
+ offsetof(WorkerState, handle) +
+ sizeof(BackgroundWorkerHandle *) * nworkers);
+ wstate->nworkers = 0;
+
+ /*
+ * Arrange to kill all the workers if we abort before all workers are
+ * finished hooking themselves up to the dynamic shared memory segment.
+ *
+ * If we die after all the workers have finished hooking themselves up to
+ * the dynamic shared memory segment, we'll mark the two queues to which
+ * we're directly connected as detached, and the worker(s) connected to
+ * those queues will exit, marking any other queues to which they are
+ * connected as detached. This will cause any as-yet-unaware workers
+ * connected to those queues to exit in their turn, and so on, until
+ * everybody exits.
+ *
+ * But suppose the workers which are supposed to connect to the queues to
+ * which we're directly attached exit due to some error before they
+ * actually attach the queues. The remaining workers will have no way of
+ * knowing this. From their perspective, they're still waiting for those
+ * workers to start, when in fact they've already died.
+ */
+ on_dsm_detach(seg, cleanup_background_workers,
+ PointerGetDatum(wstate));
+
+ /* Configure a worker. */
+ MemSet(&worker, 0, sizeof(BackgroundWorker));
+
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_notify_pid = MyProcPid;
+ sprintf(worker.bgw_library_name, "postgres");
+ sprintf(worker.bgw_function_name, "CopyFromBgwMainLoop");
+
+ worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+
+ /* Register the workers. */
+ for (i = 0; i < nworkers; ++i)
+ {
+ snprintf(worker.bgw_name, BGW_MAXLEN, "copy_from_bgw_pool_worker_%d", i);
+ if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not register background process"),
+ errhint("You may need to increase max_worker_processes.")));
+ ++wstate->nworkers;
+ }
+
+ /* All done. */
+ MemoryContextSwitchTo(oldcontext);
+ return wstate;
+}
+
+static void
+cleanup_background_workers(dsm_segment *seg, Datum arg)
+{
+ WorkerState *wstate = (WorkerState *) DatumGetPointer(arg);
+
+ while (wstate->nworkers > 0)
+ {
+ --wstate->nworkers;
+ TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
+ }
+}
+
+static void
+wait_for_workers_to_become_ready(WorkerState *wstate,
+ volatile ParallelState *pst)
+{
+ bool result = false;
+
+ for (;;)
+ {
+ int workers_ready;
+
+ /* If all the workers are ready, we have succeeded. */
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // SpinLockAcquire(&pst->mutex);
+ workers_ready = pst->workers_ready;
+ // SpinLockRelease(&pst->mutex);
+ LWLockRelease(CopyFromBgwLock);
+ // LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // workers_ready = pst->workers_ready;
+ // LWLockRelease(CopyFromBgwLock);
+ if (workers_ready >= wstate->nworkers)
+ {
+ result = true;
+ break;
+ }
+
+ /* If any workers (or the postmaster) have died, we have failed. */
+ if (!check_worker_status(wstate))
+ {
+ result = false;
+ break;
+ }
+
+ /* Wait to be signalled. */
+ WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ if (!result)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("one or more background workers failed to start")));
+}
+
+static bool
+check_worker_status(WorkerState *wstate)
+{
+ int n;
+
+ /* If any workers (or the postmaster) have died, we have failed. */
+ for (n = 0; n < wstate->nworkers; ++n)
+ {
+ BgwHandleStatus status;
+ pid_t pid;
+
+ status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
+ if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
+ return false;
+ }
+
+ /* Otherwise, things still look OK. */
+ return true;
+}
+
+
+/*
+ * Parallel Copy FROM file to relation.
+ */
+extern uint64
+ParallelCopyFrom(CopyState cstate)
+{
+ CopyFromState cfstate;
+ ParallelState *pst;
+ dsm_segment *seg;
+ shm_mq_handle *outqh;
+ shm_mq_handle *inqh;
+ shm_mq_result res;
+ int64 queue_size = 64;
+ int32 nworkers = max_parallel_workers_per_gather;
+
+ cfstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
+
+ cfstate->cstate = cstate;
+ cfstate->estate = CreateExecutorState(); /* for ExecConstraints() */
+ cfstate->oldcontext = CurrentMemoryContext;
+ cfstate->mycid = GetCurrentCommandId(true);
+
+ cfstate->saved_resultRelInfo = NULL;
+ cfstate->hi_options = 0; /* start with default heap_insert options */
+ cfstate->processed = 0;
+ cfstate->nBufferedTuples = 0;
+ cfstate->prev_leaf_part_index = -1;
+ cfstate->bufferedTuples = NULL; /* initialize to silence warning */
+ cfstate->bufferedTuplesSize = 0;
+ cfstate->firstBufferedLineNo = 0;
+
+
+ // BG Worker setup start
+ // BackgroundWorker worker;
+ // BackgroundWorkerHandle *bgwhandle = NULL;
+ // BgwHandleStatus bgwstatus;
+ // pid_t bgwpid;
+ // int bgwarg = 5;
+ //
+ // elog(LOG, "Main COPY process (pid %d): starting BGWorker", MyProcPid);
+ // MemSet(&worker, 0, sizeof(BackgroundWorker));
+ //
+ // worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ // BGWORKER_BACKEND_DATABASE_CONNECTION;
+ // worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ // worker.bgw_restart_time = BGW_NEVER_RESTART;
+ // worker.bgw_notify_pid = MyProcPid;
+ // sprintf(worker.bgw_library_name, "postgres");
+ // sprintf(worker.bgw_function_name, "CopyFromBgwMainLoop");
+ //
+ // snprintf(worker.bgw_name, BGW_MAXLEN, "copy_from_bgw_pool_worker_%d", 1);
+ // // worker.bgw_main_arg = PointerGetDatum(cfstate);
+ // worker.bgw_main_arg = PointerGetDatum(&bgwarg);
+ // BG Worker setup end
+
+ Assert(cfstate->cstate->rel);
+
+ pst = shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
+
+ /*
+ * The target must be a plain relation or have an INSTEAD OF INSERT row
+ * trigger. (Currently, such triggers are only allowed on views, so we
+ * only hint about them in the view case.)
+ */
+ if (cfstate->cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+ cfstate->cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+ !(cfstate->cstate->rel->trigdesc &&
+ cfstate->cstate->rel->trigdesc->trig_insert_instead_row))
+ {
+ if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to view \"%s\"",
+ RelationGetRelationName(cfstate->cstate->rel)),
+ errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
+ else if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to materialized view \"%s\"",
+ RelationGetRelationName(cfstate->cstate->rel))));
+ else if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to foreign table \"%s\"",
+ RelationGetRelationName(cfstate->cstate->rel))));
+ else if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to sequence \"%s\"",
+ RelationGetRelationName(cfstate->cstate->rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to non-table relation \"%s\"",
+ RelationGetRelationName(cfstate->cstate->rel))));
+ }
+
+ cfstate->tupDesc = RelationGetDescr(cfstate->cstate->rel);
+
+ /*----------
+ * Check to see if we can avoid writing WAL
+ *
+ * If archive logging/streaming is not enabled *and* either
+ * - table was created in same transaction as this COPY
+ * - data is being written to relfilenode created in this transaction
+ * then we can skip writing WAL. It's safe because if the transaction
+ * doesn't commit, we'll discard the table (or the new relfilenode file).
+ * If it does commit, we'll have done the heap_sync at the bottom of this
+ * routine first.
+ *
+ * As mentioned in comments in utils/rel.h, the in-same-transaction test
+ * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
+ * can be cleared before the end of the transaction. The exact case is
+ * when a relation sets a new relfilenode twice in same transaction, yet
+ * the second one fails in an aborted subtransaction, e.g.
+ *
+ * BEGIN;
+ * TRUNCATE t;
+ * SAVEPOINT save;
+ * TRUNCATE t;
+ * ROLLBACK TO save;
+ * COPY ...
+ *
+ * Also, if the target file is new-in-transaction, we assume that checking
+ * FSM for free space is a waste of time, even if we must use WAL because
+ * of archiving. This could possibly be wrong, but it's unlikely.
+ *
+ * The comments for heap_insert and RelationGetBufferForTuple specify that
+ * skipping WAL logging is only safe if we ensure that our tuples do not
+ * go into pages containing tuples from any other transactions --- but this
+ * must be the case if we have a new table or new relfilenode, so we need
+ * no additional work to enforce that.
+ *----------
+ */
+ /* createSubid is creation check, newRelfilenodeSubid is truncation check */
+ if (cfstate->cstate->rel->rd_createSubid != InvalidSubTransactionId ||
+ cfstate->cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
+ {
+ cfstate->hi_options |= HEAP_INSERT_SKIP_FSM;
+ if (!XLogIsNeeded())
+ cfstate->hi_options |= HEAP_INSERT_SKIP_WAL;
+ }
+
+ /*
+ * Optimize if new relfilenode was created in this subxact or one of its
+ * committed children and we won't see those rows later as part of an
+ * earlier scan or command. This ensures that if this subtransaction
+ * aborts then the frozen rows won't be visible after xact cleanup. Note
+ * that the stronger test of exactly which subtransaction created it is
+ * crucial for correctness of this optimization.
+ */
+ if (cfstate->cstate->freeze)
+ {
+ if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot perform FREEZE because of prior transaction activity")));
+
+ if (cfstate->cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
+ cfstate->cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
+
+ cfstate->hi_options |= HEAP_INSERT_FROZEN;
+ }
+
+ /*
+ * We need a ResultRelInfo so we can use the regular executor's
+ * index-entry-making machinery. (There used to be a huge amount of code
+ * here that basically duplicated execUtils.c ...)
+ */
+ cfstate->resultRelInfo = makeNode(ResultRelInfo);
+ InitResultRelInfo(cfstate->resultRelInfo,
+ cfstate->cstate->rel,
+ 1, /* dummy rangetable index */
+ NULL,
+ 0);
+
+ ExecOpenIndices(cfstate->resultRelInfo, false);
+
+ cfstate->estate->es_result_relations = cfstate->resultRelInfo;
+ cfstate->estate->es_num_result_relations = 1;
+ cfstate->estate->es_result_relation_info = cfstate->resultRelInfo;
+ cfstate->estate->es_range_table = cfstate->cstate->range_table;
+
+ /* Set up a tuple slot too */
+ cfstate->myslot = ExecInitExtraTupleSlot(cfstate->estate);
+ ExecSetSlotDescriptor(cfstate->myslot, cfstate->tupDesc);
+ /* Triggers might need a slot as well */
+ cfstate->estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(cfstate->estate);
+
+ /*
+ * It's more efficient to prepare a bunch of tuples for insertion, and
+ * insert them in one heap_multi_insert() call, than call heap_insert()
+ * separately for every tuple. However, we can't do that if there are
+ * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+ * expressions. Such triggers or expressions might query the table we're
+ * inserting to, and act differently if the tuples that have already been
+ * cfstate->processed and prepared for insertion are not there. We also can't do
+ * it if the table is partitioned.
+ */
+ if ((cfstate->resultRelInfo->ri_TrigDesc != NULL &&
+ (cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ cfstate->resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cfstate->cstate->partition_dispatch_info != NULL ||
+ cfstate->cstate->volatile_defexprs)
+ {
+ cfstate->useHeapMultiInsert = false;
+ }
+ else
+ {
+ cfstate->useHeapMultiInsert = true;
+ cfstate->bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+ }
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
+ /*
+ * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
+ * should do this for COPY, since it's not really an "INSERT" statement as
+ * such. However, executing these triggers maintains consistency with the
+ * EACH ROW triggers that we already fire on COPY.
+ */
+ ExecBSInsertTriggers(cfstate->estate, cfstate->resultRelInfo);
+
+ cfstate->values = (Datum *) palloc(cfstate->tupDesc->natts * sizeof(Datum));
+ cfstate->nulls = (bool *) palloc(cfstate->tupDesc->natts * sizeof(bool));
+
+ cfstate->bistate = GetBulkInsertState();
+ cfstate->econtext = GetPerTupleExprContext(cfstate->estate);
+
+ /* Set up callback to identify error line number */
+ cfstate->errcallback.callback = CopyFromErrorCallback;
+ cfstate->errcallback.arg = (void *) cfstate->cstate;
+ cfstate->errcallback.previous = error_context_stack;
+ error_context_stack = &cfstate->errcallback;
+
+ // // BG Worker startup
+ // RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
+ // bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
+ // elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d)", MyProcPid, bgwpid);
+ // // BG Worker startup
+
+ for (;;)
+ {
+ TupleTableSlot *slot;
+ bool skip_tuple;
+ Oid loaded_oid = InvalidOid;
+ int next_cf_state; /* NextCopyFrom return state */
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (cfstate->nBufferedTuples == 0)
+ {
+ /*
+ * Reset the per-tuple exprcontext. We can only do this if the
+ * tuple buffer is empty. (Calling the context the per-tuple
+ * memory context is a bit of a misnomer now.)
+ */
+ ResetPerTupleExprContext(cfstate->estate);
+ }
+
+ /* Switch into its memory context */
+ MemoryContextSwitchTo(GetPerTupleMemoryContext(cfstate->estate));
+
+ next_cf_state = NextCopyFrom(cfstate->cstate, cfstate->econtext, cfstate->values, cfstate->nulls, &loaded_oid);
+
+
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // SpinLockAcquire(&pst->mutex);
+ pst->curr_line++;
+ // SpinLockRelease(&pst->mutex);
+ LWLockRelease(CopyFromBgwLock);
+
+ if (!next_cf_state) {
+ break;
+ }
+ else if (next_cf_state == NCF_SUCCESS)
+ {
+ /* And now we can form the input tuple. */
+ cfstate->tuple = heap_form_tuple(cfstate->tupDesc, cfstate->values, cfstate->nulls);
+
+ if (loaded_oid != InvalidOid)
+ HeapTupleSetOid(cfstate->tuple, loaded_oid);
+
+ /*
+ * Constraints might reference the tableoid column, so initialize
+ * t_tableOid before evaluating them.
+ */
+ cfstate->tuple->t_tableOid = RelationGetRelid(cfstate->resultRelInfo->ri_RelationDesc);
+
+ /* Triggers and stuff need to be invoked in query context. */
+ MemoryContextSwitchTo(cfstate->oldcontext);
+
+ /* Place tuple in tuple slot --- but slot shouldn't free it */
+ slot = cfstate->myslot;
+ ExecStoreTuple(cfstate->tuple, slot, InvalidBuffer, false);
+
+ /* Determine the partition to heap_insert the tuple into */
+ if (cfstate->cstate->partition_dispatch_info)
+ {
+ int leaf_part_index;
+ TupleConversionMap *map;
+
+ /*
+ * Away we go ... If we end up not finding a partition after all,
+ * ExecFindPartition() does not return and errors out instead.
+ * Otherwise, the returned value is to be used as an index into
+ * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+ * will get us the ResultRelInfo and TupleConversionMap for the
+ * partition, respectively.
+ */
+ leaf_part_index = ExecFindPartition(cfstate->resultRelInfo,
+ cfstate->cstate->partition_dispatch_info,
+ slot,
+ cfstate->estate);
+ Assert(leaf_part_index >= 0 &&
+ leaf_part_index < cfstate->cstate->num_partitions);
+
+ /*
+ * If this tuple is mapped to a partition that is not same as the
+ * previous one, we'd better make the bulk insert mechanism gets a
+ * new buffer.
+ */
+ if (cfstate->prev_leaf_part_index != leaf_part_index)
+ {
+ ReleaseBulkInsertStatePin(cfstate->bistate);
+ cfstate->prev_leaf_part_index = leaf_part_index;
+ }
+
+ /*
+ * Save the old ResultRelInfo and switch to the one corresponding
+ * to the selected partition.
+ */
+ cfstate->saved_resultRelInfo = cfstate->resultRelInfo;
+ cfstate->resultRelInfo = cfstate->cstate->partitions + leaf_part_index;
+
+ /* We do not yet have a way to insert into a foreign partition */
+ if (cfstate->resultRelInfo->ri_FdwRoutine)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot route inserted tuples to a foreign table")));
+
+ /*
+ * For ExecInsertIndexTuples() to work on the partition's indexes
+ */
+ cfstate->estate->es_result_relation_info = cfstate->resultRelInfo;
+
+ /*
+ * If we're capturing transition tuples, we might need to convert
+ * from the partition rowtype to parent rowtype.
+ */
+ if (cfstate->cstate->transition_capture != NULL)
+ {
+ if (cfstate->resultRelInfo->ri_TrigDesc &&
+ (cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ cfstate->resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+ {
+ /*
+ * If there are any BEFORE or INSTEAD triggers on the
+ * partition, we'll have to be ready to convert their
+ * result back to tuplestore format.
+ */
+ cfstate->cstate->transition_capture->tcs_original_insert_tuple = NULL;
+ cfstate->cstate->transition_capture->tcs_map =
+ cfstate->cstate->transition_tupconv_maps[leaf_part_index];
+ }
+ else
+ {
+ /*
+ * Otherwise, just remember the original unconverted
+ * tuple, to avoid a needless round trip conversion.
+ */
+ cfstate->cstate->transition_capture->tcs_original_insert_tuple = cfstate->tuple;
+ cfstate->cstate->transition_capture->tcs_map = NULL;
+ }
+ }
+ /*
+ * We might need to convert from the parent rowtype to the
+ * partition rowtype.
+ */
+ map = cfstate->cstate->partition_tupconv_maps[leaf_part_index];
+ if (map)
+ {
+ Relation partrel = cfstate->resultRelInfo->ri_RelationDesc;
+
+ cfstate->tuple = do_convert_tuple(cfstate->tuple, map);
+
+ /*
+ * We must use the partition's tuple descriptor from this
+ * point on. Use a dedicated slot from this point on until
+ * we're finished dealing with the partition.
+ */
+ slot = cfstate->cstate->partition_tuple_slot;
+ Assert(slot != NULL);
+ ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+ ExecStoreTuple(cfstate->tuple, slot, InvalidBuffer, true);
+ }
+
+ cfstate->tuple->t_tableOid = RelationGetRelid(cfstate->resultRelInfo->ri_RelationDesc);
+ }
+
+ skip_tuple = false;
+
+ /* BEFORE ROW INSERT Triggers */
+ if (cfstate->resultRelInfo->ri_TrigDesc &&
+ cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+ {
+ slot = ExecBRInsertTriggers(cfstate->estate, cfstate->resultRelInfo, slot);
+
+ if (slot == NULL) /* "do nothing" */
+ skip_tuple = true;
+ else /* trigger might have changed tuple */
+ cfstate->tuple = ExecMaterializeSlot(slot);
+ }
+ }
+ else
+ {
+ skip_tuple = true;
+ }
+
+ if (!skip_tuple)
+ {
+ if (cfstate->resultRelInfo->ri_TrigDesc &&
+ cfstate->resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+ {
+ /* Pass the data to the INSTEAD ROW INSERT trigger */
+ ExecIRInsertTriggers(cfstate->estate, cfstate->resultRelInfo, slot);
+ }
+ else
+ {
+ /*
+ * We always check the partition constraint, including when
+ * the tuple got here via tuple-routing. However we don't
+ * need to in the latter case if no BR trigger is defined on
+ * the partition. Note that a BR trigger might modify the
+ * tuple such that the partition constraint is no longer
+ * satisfied, so we need to check in that case.
+ */
+ bool check_partition_constr =
+ (cfstate->resultRelInfo->ri_PartitionCheck != NIL);
+
+ if (cfstate->saved_resultRelInfo != NULL &&
+ !(cfstate->resultRelInfo->ri_TrigDesc &&
+ cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row))
+ check_partition_constr = false;
+
+ /* Check the constraints of the tuple */
+ if (cfstate->cstate->rel->rd_att->constr || check_partition_constr)
+ ExecConstraints(cfstate->resultRelInfo, slot, cfstate->estate);
+
+ if (cfstate->useHeapMultiInsert)
+ {
+ /* Add this tuple to the tuple buffer */
+ if (cfstate->nBufferedTuples == 0)
+ cfstate->firstBufferedLineNo = cfstate->cstate->cur_lineno;
+ cfstate->bufferedTuples[cfstate->nBufferedTuples++] = cfstate->tuple;
+ cfstate->bufferedTuplesSize += cfstate->tuple->t_len;
+
+ /*
+ * If the buffer filled up, flush it. Also flush if the
+ * total size of all the tuples in the buffer becomes
+ * large, to avoid using large amounts of memory for the
+ * buffer when the tuples are exceptionally wide.
+ */
+ if (cfstate->nBufferedTuples == MAX_BUFFERED_TUPLES ||
+ cfstate->bufferedTuplesSize > 65535)
+ {
+ CopyFromInsertBatch(cfstate->cstate, cfstate->estate, cfstate->mycid, cfstate->hi_options,
+ cfstate->resultRelInfo, cfstate->myslot, cfstate->bistate,
+ cfstate->nBufferedTuples, cfstate->bufferedTuples,
+ cfstate->firstBufferedLineNo);
+ cfstate->nBufferedTuples = 0;
+ cfstate->bufferedTuplesSize = 0;
+ }
+ }
+ else
+ {
+ List *recheckIndexes = NIL;
+
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(cfstate->resultRelInfo->ri_RelationDesc, cfstate->tuple, cfstate->mycid,
+ cfstate->hi_options, cfstate->bistate);
+
+ if (cfstate->resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot,
+ &(cfstate->tuple->t_self),
+ cfstate->estate,
+ false,
+ NULL,
+ NIL);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(cfstate->estate, cfstate->resultRelInfo, cfstate->tuple,
+ recheckIndexes, cfstate->cstate->transition_capture);
+
+ list_free(recheckIndexes);
+ }
+ }
+
+ /*
+ * We count only tuples not suppressed by a BEFORE INSERT trigger;
+ * this is the same definition used by execMain.c for counting
+ * tuples inserted by an INSERT command.
+ */
+ cfstate->processed++;
+
+ if (cfstate->saved_resultRelInfo)
+ {
+ cfstate->resultRelInfo = cfstate->saved_resultRelInfo;
+ cfstate->estate->es_result_relation_info = cfstate->resultRelInfo;
+ }
+ }
+ }
+
+ /* Flush any remaining buffered tuples */
+ if (cfstate->nBufferedTuples > 0)
+ CopyFromInsertBatch(cfstate->cstate, cfstate->estate, cfstate->mycid, cfstate->hi_options,
+ cfstate->resultRelInfo, cfstate->myslot, cfstate->bistate,
+ cfstate->nBufferedTuples, cfstate->bufferedTuples,
+ cfstate->firstBufferedLineNo);
+
+ /* Done, clean up */
+ error_context_stack = cfstate->errcallback.previous;
+
+ FreeBulkInsertState(cfstate->bistate);
+
+ MemoryContextSwitchTo(cfstate->oldcontext);
+
+ /*
+ * In the old protocol, tell pqcomm that we can process normal protocol
+ * messages again.
+ */
+ if (cfstate->cstate->copy_dest == COPY_OLD_FE)
+ pq_endmsgread();
+
+ /* Execute AFTER STATEMENT insertion triggers */
+ ExecASInsertTriggers(cfstate->estate, cfstate->resultRelInfo, cfstate->cstate->transition_capture);
+
+ /* Handle queued AFTER triggers */
+ AfterTriggerEndQuery(cfstate->estate);
+
+ pfree(cfstate->values);
+ pfree(cfstate->nulls);
+
+ ExecResetTupleTable(cfstate->estate->es_tupleTable, false);
+
+ ExecCloseIndices(cfstate->resultRelInfo);
+
+ /* Close all the partitioned tables, leaf partitions, and their indices */
+ if (cfstate->cstate->partition_dispatch_info)
+ {
+ int i;
+
+ /*
+ * Remember cfstate->cstate->partition_dispatch_info[0] corresponds to the root
+ * partitioned table, which we must not try to close, because it is
+ * the main target table of COPY that will be closed eventually by
+ * DoCopy(). Also, tupslot is NULL for the root partitioned table.
+ */
+ for (i = 1; i < cfstate->cstate->num_dispatch; i++)
+ {
+ PartitionDispatch pd = cfstate->cstate->partition_dispatch_info[i];
+
+ heap_close(pd->reldesc, NoLock);
+ ExecDropSingleTupleTableSlot(pd->tupslot);
+ }
+ for (i = 0; i < cfstate->cstate->num_partitions; i++)
+ {
+ ResultRelInfo *resultRelInfo = cfstate->cstate->partitions + i;
+
+ ExecCloseIndices(resultRelInfo);
+ heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+ }
+
+ /* Release the standalone partition tuple descriptor */
+ ExecDropSingleTupleTableSlot(cfstate->cstate->partition_tuple_slot);
+ }
+
+ /* Close any trigger target relations */
+ ExecCleanUpTriggerState(cfstate->estate);
+
+ FreeExecutorState(cfstate->estate);
+
+ /*
+ * If we skipped writing WAL, then we need to sync the heap (but not
+ * indexes since those use WAL anyway)
+ */
+ if (cfstate->hi_options & HEAP_INSERT_SKIP_WAL)
+ heap_sync(cfstate->cstate->rel);
+
+ /* Clean up. */
+ dsm_detach(seg);
+
+ return cfstate->processed;
+}
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 82a1cf5150..b89e17698b 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
- LWLockTranchesAllocated = 64;
+ LWLockTranchesAllocated = 128;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
@@ -511,6 +511,7 @@ RegisterLWLockTranches(void)
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+ LWLockRegisterTranche(LWTRANCHE_COPY_FROM, "copy_from");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ecedb..91ab6fd711 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42
BackendRandomLock 43
LogicalRepWorkerLock 44
CLogTruncationLock 45
+CopyFromBgwLock 46
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 0b690b39ca..2935e567ae 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -21,6 +21,7 @@
/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
+typedef struct CopyFromStateData *CopyFromState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
@@ -39,6 +40,7 @@ extern void CopyFromErrorCallback(void *arg);
extern void CopyFromBgwMainLoop(Datum main_arg);
extern uint64 CopyFrom(CopyState cstate);
+extern uint64 ParallelCopyFrom(CopyState cstate);
extern DestReceiver *CreateCopyDestReceiver(void);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 3d16132c88..beb636936f 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -213,6 +213,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_PREDICATE_LOCK_MANAGER,
LWTRANCHE_PARALLEL_QUERY_DSA,
LWTRANCHE_TBM,
+ LWTRANCHE_COPY_FROM,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
--
2.11.0
From bbb5d120c9164876ac9c30db801c334390920920 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Mon, 24 Jul 2017 15:18:21 +0300
Subject: [PATCH 05/13] Pool of dummy BGWorkers v1, receiving line number via
the personal shm_mq and printing it out
---
src/backend/commands/copy.c | 157 +++++++++++++++++++++++++-------------------
1 file changed, 88 insertions(+), 69 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5a087f33c7..7ce94a12db 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -414,12 +414,11 @@ static bool CopyGetInt16(CopyState cstate, int16 *val);
static ParallelState* shm_mq_setup(int64 queue_size, int32 nworkers,
- dsm_segment **segp, shm_mq_handle **output,
- shm_mq_handle **input);
+ dsm_segment **segp, shm_mq_handle **mq_handles[]);
static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
dsm_segment **segp,
- ParallelState **hdrp,
- shm_mq **outp, shm_mq **inp);
+ ParallelState **pstp,
+ shm_mq **mqs[]);
static WorkerState *setup_background_workers(int nworkers,
dsm_segment *seg);
static void cleanup_background_workers(dsm_segment *seg, Datum arg);
@@ -5025,14 +5024,17 @@ CreateCopyDestReceiver(void)
void
CopyFromBgwMainLoop(Datum main_arg)
{
- volatile ParallelState *hdr;
+ volatile ParallelState *pst;
dsm_segment *seg;
shm_toc *toc;
- shm_mq_handle *inqh;
- shm_mq_handle *outqh;
int myworkernumber;
PGPROC *registrant;
- int prev = 0;
+ // int prev = 0;
+ shm_mq *mq;
+ shm_mq_handle *mqh;
+ shm_mq_result shmq_res;
+ Size len;
+ void *data;
/*
* Establish signal handlers.
@@ -5074,13 +5076,13 @@ CopyFromBgwMainLoop(Datum main_arg)
* find it. Our worker number gives our identity: there may be just one
* worker involved in this parallel operation, or there may be many.
*/
- hdr = shm_toc_lookup(toc, 0, false);
+ pst = shm_toc_lookup(toc, 0, false);
LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
// SpinLockAcquire(&pst->mutex);
- myworkernumber = ++hdr->workers_attached;
+ myworkernumber = ++pst->workers_attached;
// SpinLockRelease(&pst->mutex);
LWLockRelease(CopyFromBgwLock);
- if (myworkernumber > hdr->workers_total)
+ if (myworkernumber > pst->workers_total)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("too many message queue testing workers already")));
@@ -5088,7 +5090,9 @@ CopyFromBgwMainLoop(Datum main_arg)
/*
* Attach to the appropriate message queues.
*/
- // attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh);
+ mq = shm_toc_lookup(toc, myworkernumber, false);
+ shm_mq_set_receiver(mq, MyProc);
+ mqh = shm_mq_attach(mq, seg, NULL);
/*
* Indicate that we're fully initialized and ready to begin the main part
@@ -5101,8 +5105,8 @@ CopyFromBgwMainLoop(Datum main_arg)
*/
LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
// SpinLockAcquire(&pst->mutex);
- ++hdr->workers_ready;
- elog(LOG, "BGWorker #%d started with curr_line=%d", myworkernumber, hdr->curr_line);
+ ++pst->workers_ready;
+ elog(LOG, "BGWorker #%d started", myworkernumber);
// SpinLockRelease(&pst->mutex);
LWLockRelease(CopyFromBgwLock);
registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
@@ -5119,15 +5123,20 @@ CopyFromBgwMainLoop(Datum main_arg)
{
CHECK_FOR_INTERRUPTS();
- LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
- // SpinLockAcquire(&hdr->mutex);
- if (hdr->curr_line != prev)
- {
- elog(LOG, "BGWorker #%d dummy processing line #%d", myworkernumber, hdr->curr_line);
- }
- prev = hdr->curr_line;
- // SpinLockRelease(&hdr->mutex);
- LWLockRelease(CopyFromBgwLock);
+ // LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // // SpinLockAcquire(&pst->mutex);
+ // if (pst->curr_line != prev)
+ // {
+ // elog(LOG, "BGWorker #%d dummy processing line #%d", myworkernumber, pst->curr_line);
+ // }
+ // prev = pst->curr_line;
+ // // SpinLockRelease(&pst->mutex);
+ // LWLockRelease(CopyFromBgwLock);
+
+ shmq_res = shm_mq_receive(mqh, &len, &data, false);
+ if (shmq_res != SHM_MQ_SUCCESS)
+ break;
+ elog(LOG, "BGWorker #%d dummy processing line #%ld", myworkernumber, *(int64 *) data);
}
/*
@@ -5167,27 +5176,31 @@ handle_sigterm(SIGNAL_ARGS)
*/
static ParallelState*
shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
- shm_mq_handle **output, shm_mq_handle **input)
+ shm_mq_handle ***mq_handles)
{
- dsm_segment *seg;
- ParallelState *hdr;
- shm_mq *outq = NULL; /* placate compiler */
- shm_mq *inq = NULL; /* placate compiler */
- WorkerState *wstate;
+ int i;
+ dsm_segment *seg;
+ ParallelState *pst;
+ shm_mq **mqs;
+ WorkerState *wstate;
+
+ mqs = palloc0(sizeof(shm_mq *) * nworkers);
/* Set up a dynamic shared memory segment. */
- setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
+ setup_dynamic_shared_memory(queue_size, nworkers, &seg, &pst, &mqs);
*segp = seg;
/* Register background workers. */
wstate = setup_background_workers(nworkers, seg);
/* Attach the queues. */
- *output = shm_mq_attach(outq, seg, wstate->handle[0]);
- *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
+ for (i = 0; i < nworkers; ++i)
+ {
+ (*mq_handles)[i] = shm_mq_attach(mqs[i], seg, wstate->handle[i]);
+ }
/* Wait for workers to become ready. */
- wait_for_workers_to_become_ready(wstate, hdr);
+ wait_for_workers_to_become_ready(wstate, pst);
/*
* Once we reach this point, all workers are ready. We no longer need to
@@ -5198,7 +5211,7 @@ shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
// PointerGetDatum(wstate));
// pfree(wstate);
- return hdr;
+ return pst;
}
/*
@@ -5210,15 +5223,15 @@ shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
*/
static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,
- dsm_segment **segp, ParallelState **hdrp,
- shm_mq **outp, shm_mq **inp)
+ dsm_segment **segp, ParallelState **pstp,
+ shm_mq ***mqs)
{
shm_toc_estimator e;
- int i;
- Size segsize;
- dsm_segment *seg;
- shm_toc *toc;
- ParallelState *hdr;
+ int i;
+ Size segsize;
+ dsm_segment *seg;
+ shm_toc *toc;
+ ParallelState *pst;
/* Ensure a valid queue size. */
if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
@@ -5244,7 +5257,7 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
shm_toc_estimate_chunk(&e, sizeof(ParallelState));
for (i = 0; i <= nworkers; ++i)
shm_toc_estimate_chunk(&e, (Size) queue_size);
- shm_toc_estimate_keys(&e, 2 + nworkers);
+ shm_toc_estimate_keys(&e, 1 + nworkers);
segsize = shm_toc_estimate(&e);
/* Create the shared memory segment and establish a table of contents. */
@@ -5253,39 +5266,28 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
segsize);
/* Set up the header region. */
- hdr = shm_toc_allocate(toc, sizeof(ParallelState));
- SpinLockInit(&hdr->mutex);
- hdr->workers_total = nworkers;
- hdr->workers_attached = 0;
- hdr->workers_ready = 0;
- shm_toc_insert(toc, 0, hdr);
+ pst = shm_toc_allocate(toc, sizeof(ParallelState));
+ SpinLockInit(&pst->mutex);
+ pst->workers_total = nworkers;
+ pst->workers_attached = 0;
+ pst->workers_ready = 0;
+ shm_toc_insert(toc, 0, pst);
/* Set up one message queue per worker, plus one. */
- for (i = 0; i <= nworkers; ++i)
+ for (i = 0; i < nworkers; ++i)
{
- shm_mq *mq;
+ shm_mq *mq;
mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
(Size) queue_size);
shm_toc_insert(toc, i + 1, mq);
-
- if (i == 0)
- {
- /* We send messages to the first queue. */
- shm_mq_set_sender(mq, MyProc);
- *outp = mq;
- }
- if (i == nworkers)
- {
- /* We receive messages from the last queue. */
- shm_mq_set_receiver(mq, MyProc);
- *inp = mq;
- }
+ shm_mq_set_sender(mq, MyProc);
+ (*mqs)[i] = mq;
}
/* Return results to caller. */
*segp = seg;
- *hdrp = hdr;
+ *pstp = pst;
}
/*
@@ -5454,11 +5456,17 @@ ParallelCopyFrom(CopyState cstate)
CopyFromState cfstate;
ParallelState *pst;
dsm_segment *seg;
- shm_mq_handle *outqh;
- shm_mq_handle *inqh;
- shm_mq_result res;
- int64 queue_size = 64;
int32 nworkers = max_parallel_workers_per_gather;
+ int64 queue_size = 8;
+ shm_mq_handle **mq_handles;
+ shm_mq_result shmq_res;
+ int last_worker_used = 0;
+ int64 message = 0;
+ // char *message_contents = VARDATA_ANY(message);
+ // int message_size = VARSIZE_ANY_EXHDR(message);
+ int message_size = sizeof(message);
+
+ mq_handles = palloc0(sizeof(shm_mq_handle *) * nworkers);
cfstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
@@ -5502,7 +5510,7 @@ ParallelCopyFrom(CopyState cstate)
Assert(cfstate->cstate->rel);
- pst = shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
+ pst = shm_mq_setup(queue_size * message_size, nworkers, &seg, &mq_handles);
/*
* The target must be a plain relation or have an INSTEAD OF INSERT row
@@ -5721,6 +5729,17 @@ ParallelCopyFrom(CopyState cstate)
// SpinLockRelease(&pst->mutex);
LWLockRelease(CopyFromBgwLock);
+ if (next_cf_state) {
+ message = pst->curr_line;
+ shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], message_size, &message, false);
+ if (shmq_res != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not send message")));
+ if (last_worker_used == nworkers)
+ last_worker_used = 0;
+ }
+
if (!next_cf_state) {
break;
}
--
2.11.0
From 34d1f670fa0876fd33b5a5595eb52d98b7a386e1 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Mon, 31 Jul 2017 21:54:27 +0300
Subject: [PATCH 06/13] Sending raw line to BGWorkers
---
src/backend/commands/copy.c | 470 +++++++++++++++++++++-----------------------
1 file changed, 221 insertions(+), 249 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 7ce94a12db..b03fbacfa1 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -247,35 +247,6 @@ typedef struct
uint64 processed; /* # of tuples processed */
} DR_copy;
-typedef struct CopyFromStateData
-{
- CopyState cstate;
- HeapTuple tuple;
- TupleDesc tupDesc;
- Datum *values;
- bool *nulls;
- ResultRelInfo *resultRelInfo;
- ResultRelInfo *saved_resultRelInfo;
- EState *estate;
- ExprContext *econtext;
- TupleTableSlot *myslot;
- MemoryContext oldcontext;
-
- ErrorContextCallback errcallback;
- CommandId mycid;
- int hi_options;
- BulkInsertState bistate;
- uint64 processed;
- bool useHeapMultiInsert;
- int nBufferedTuples;
- int prev_leaf_part_index;
-
-#define MAX_BUFFERED_TUPLES 1000
- HeapTuple *bufferedTuples;
- Size bufferedTuplesSize;
- int firstBufferedLineNo;
-} CopyFromStateData;
-
typedef struct
{
int nworkers;
@@ -289,8 +260,11 @@ typedef struct
int workers_total;
int workers_attached;
int workers_ready;
+ Oid database_id;
+ Oid authenticated_user_id;
} ParallelState;
+// TODO Consider change
#define PG_COPY_FROM_SHM_MQ_MAGIC 0x79fb2447
/*
@@ -413,7 +387,7 @@ static void CopySendInt16(CopyState cstate, int16 val);
static bool CopyGetInt16(CopyState cstate, int16 *val);
-static ParallelState* shm_mq_setup(int64 queue_size, int32 nworkers,
+static ParallelState* shm_mq_setup(int64 queue_size, int32 nworkers,
dsm_segment **segp, shm_mq_handle **mq_handles[]);
static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
dsm_segment **segp,
@@ -3407,7 +3381,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
nfields = file_has_oids ? (attr_count + 1) : attr_count;
-
+
/* Set error level to WARNING, if errors handling is turned on */
if (cstate->ignore_errors)
{
@@ -5029,7 +5003,6 @@ CopyFromBgwMainLoop(Datum main_arg)
shm_toc *toc;
int myworkernumber;
PGPROC *registrant;
- // int prev = 0;
shm_mq *mq;
shm_mq_handle *mqh;
shm_mq_result shmq_res;
@@ -5094,6 +5067,16 @@ CopyFromBgwMainLoop(Datum main_arg)
shm_mq_set_receiver(mq, MyProc);
mqh = shm_mq_attach(mq, seg, NULL);
+ /* Restore database connection. */
+ BackgroundWorkerInitializeConnectionByOid(pst->database_id,
+ pst->authenticated_user_id);
+
+ /*
+ * Set the client encoding to the database encoding, since that is what
+ * the leader will expect.
+ */
+ SetClientEncoding(GetDatabaseEncoding());
+
/*
* Indicate that we're fully initialized and ready to begin the main part
* of the parallel operation.
@@ -5121,22 +5104,17 @@ CopyFromBgwMainLoop(Datum main_arg)
// copy_messages(inqh, outqh);
for (;;)
{
- CHECK_FOR_INTERRUPTS();
+ char *msg;
- // LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
- // // SpinLockAcquire(&pst->mutex);
- // if (pst->curr_line != prev)
- // {
- // elog(LOG, "BGWorker #%d dummy processing line #%d", myworkernumber, pst->curr_line);
- // }
- // prev = pst->curr_line;
- // // SpinLockRelease(&pst->mutex);
- // LWLockRelease(CopyFromBgwLock);
+ CHECK_FOR_INTERRUPTS();
shmq_res = shm_mq_receive(mqh, &len, &data, false);
+ msg = (char *) palloc(len);
+ msg = (char *) data;
if (shmq_res != SHM_MQ_SUCCESS)
break;
- elog(LOG, "BGWorker #%d dummy processing line #%ld", myworkernumber, *(int64 *) data);
+ // elog(LOG, "BGWorker #%d dummy processing line #%ld", myworkernumber, *(int64 *) data);
+ elog(LOG, "BGWorker #%d dummy processing line: %s", myworkernumber, msg);
}
/*
@@ -5271,6 +5249,8 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
pst->workers_total = nworkers;
pst->workers_attached = 0;
pst->workers_ready = 0;
+ pst->database_id = MyDatabaseId;
+ pst->authenticated_user_id = GetAuthenticatedUserId();
shm_toc_insert(toc, 0, pst);
/* Set up one message queue per worker, plus one. */
@@ -5453,104 +5433,90 @@ check_worker_status(WorkerState *wstate)
extern uint64
ParallelCopyFrom(CopyState cstate)
{
- CopyFromState cfstate;
ParallelState *pst;
dsm_segment *seg;
int32 nworkers = max_parallel_workers_per_gather;
- int64 queue_size = 8;
+ int64 queue_size = 100;
shm_mq_handle **mq_handles;
shm_mq_result shmq_res;
int last_worker_used = 0;
- int64 message = 0;
+ // int64 message = 0;
+ char *message;
// char *message_contents = VARDATA_ANY(message);
// int message_size = VARSIZE_ANY_EXHDR(message);
- int message_size = sizeof(message);
+ int message_size = sizeof(char);
mq_handles = palloc0(sizeof(shm_mq_handle *) * nworkers);
- cfstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
-
- cfstate->cstate = cstate;
- cfstate->estate = CreateExecutorState(); /* for ExecConstraints() */
- cfstate->oldcontext = CurrentMemoryContext;
- cfstate->mycid = GetCurrentCommandId(true);
-
- cfstate->saved_resultRelInfo = NULL;
- cfstate->hi_options = 0; /* start with default heap_insert options */
- cfstate->processed = 0;
- cfstate->nBufferedTuples = 0;
- cfstate->prev_leaf_part_index = -1;
- cfstate->bufferedTuples = NULL; /* initialize to silence warning */
- cfstate->bufferedTuplesSize = 0;
- cfstate->firstBufferedLineNo = 0;
-
-
- // BG Worker setup start
- // BackgroundWorker worker;
- // BackgroundWorkerHandle *bgwhandle = NULL;
- // BgwHandleStatus bgwstatus;
- // pid_t bgwpid;
- // int bgwarg = 5;
- //
- // elog(LOG, "Main COPY process (pid %d): starting BGWorker", MyProcPid);
- // MemSet(&worker, 0, sizeof(BackgroundWorker));
- //
- // worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
- // BGWORKER_BACKEND_DATABASE_CONNECTION;
- // worker.bgw_start_time = BgWorkerStart_ConsistentState;
- // worker.bgw_restart_time = BGW_NEVER_RESTART;
- // worker.bgw_notify_pid = MyProcPid;
- // sprintf(worker.bgw_library_name, "postgres");
- // sprintf(worker.bgw_function_name, "CopyFromBgwMainLoop");
- //
- // snprintf(worker.bgw_name, BGW_MAXLEN, "copy_from_bgw_pool_worker_%d", 1);
- // // worker.bgw_main_arg = PointerGetDatum(cfstate);
- // worker.bgw_main_arg = PointerGetDatum(&bgwarg);
- // BG Worker setup end
-
- Assert(cfstate->cstate->rel);
-
- pst = shm_mq_setup(queue_size * message_size, nworkers, &seg, &mq_handles);
+ HeapTuple tuple;
+ TupleDesc tupDesc;
+ Datum *values;
+ bool *nulls;
+ ResultRelInfo *resultRelInfo;
+ ResultRelInfo *saved_resultRelInfo = NULL;
+ EState *estate = CreateExecutorState(); /* for ExecConstraints() */
+ ExprContext *econtext;
+ TupleTableSlot *myslot;
+ MemoryContext oldcontext = CurrentMemoryContext;
+
+ ErrorContextCallback errcallback;
+ CommandId mycid = GetCurrentCommandId(true);
+ int hi_options = 0; /* start with default heap_insert options */
+ BulkInsertState bistate;
+ uint64 processed = 0;
+ bool useHeapMultiInsert;
+ int nBufferedTuples = 0;
+ int prev_leaf_part_index = -1;
+
+#define MAX_BUFFERED_TUPLES 1000
+ HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
+ Size bufferedTuplesSize = 0;
+ int firstBufferedLineNo = 0;
+
+ Assert(cstate->rel);
+
+ // MQ size = 100 messages x 80 chars each
+ pst = shm_mq_setup(message_size * 80 * queue_size, nworkers, &seg, &mq_handles);
/*
* The target must be a plain relation or have an INSTEAD OF INSERT row
* trigger. (Currently, such triggers are only allowed on views, so we
* only hint about them in the view case.)
*/
- if (cfstate->cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
- cfstate->cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
- !(cfstate->cstate->rel->trigdesc &&
- cfstate->cstate->rel->trigdesc->trig_insert_instead_row))
+ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+ cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+ !(cstate->rel->trigdesc &&
+ cstate->rel->trigdesc->trig_insert_instead_row))
{
- if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_VIEW)
+ if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
- RelationGetRelationName(cfstate->cstate->rel)),
+ RelationGetRelationName(cstate->rel)),
errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
- else if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
+ else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to materialized view \"%s\"",
- RelationGetRelationName(cfstate->cstate->rel))));
- else if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ RelationGetRelationName(cstate->rel))));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to foreign table \"%s\"",
- RelationGetRelationName(cfstate->cstate->rel))));
- else if (cfstate->cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ RelationGetRelationName(cstate->rel))));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to sequence \"%s\"",
- RelationGetRelationName(cfstate->cstate->rel))));
+ RelationGetRelationName(cstate->rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to non-table relation \"%s\"",
- RelationGetRelationName(cfstate->cstate->rel))));
+ RelationGetRelationName(cstate->rel))));
}
- cfstate->tupDesc = RelationGetDescr(cfstate->cstate->rel);
+ tupDesc = RelationGetDescr(cstate->rel);
/*----------
* Check to see if we can avoid writing WAL
@@ -5588,12 +5554,12 @@ ParallelCopyFrom(CopyState cstate)
*----------
*/
/* createSubid is creation check, newRelfilenodeSubid is truncation check */
- if (cfstate->cstate->rel->rd_createSubid != InvalidSubTransactionId ||
- cfstate->cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
+ if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
+ cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
{
- cfstate->hi_options |= HEAP_INSERT_SKIP_FSM;
+ hi_options |= HEAP_INSERT_SKIP_FSM;
if (!XLogIsNeeded())
- cfstate->hi_options |= HEAP_INSERT_SKIP_WAL;
+ hi_options |= HEAP_INSERT_SKIP_WAL;
}
/*
@@ -5604,20 +5570,20 @@ ParallelCopyFrom(CopyState cstate)
* that the stronger test of exactly which subtransaction created it is
* crucial for correctness of this optimization.
*/
- if (cfstate->cstate->freeze)
+ if (cstate->freeze)
{
if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot perform FREEZE because of prior transaction activity")));
- if (cfstate->cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
- cfstate->cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
+ if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
+ cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
- cfstate->hi_options |= HEAP_INSERT_FROZEN;
+ hi_options |= HEAP_INSERT_FROZEN;
}
/*
@@ -5625,25 +5591,25 @@ ParallelCopyFrom(CopyState cstate)
* index-entry-making machinery. (There used to be a huge amount of code
* here that basically duplicated execUtils.c ...)
*/
- cfstate->resultRelInfo = makeNode(ResultRelInfo);
- InitResultRelInfo(cfstate->resultRelInfo,
- cfstate->cstate->rel,
+ resultRelInfo = makeNode(ResultRelInfo);
+ InitResultRelInfo(resultRelInfo,
+ cstate->rel,
1, /* dummy rangetable index */
NULL,
0);
- ExecOpenIndices(cfstate->resultRelInfo, false);
+ ExecOpenIndices(resultRelInfo, false);
- cfstate->estate->es_result_relations = cfstate->resultRelInfo;
- cfstate->estate->es_num_result_relations = 1;
- cfstate->estate->es_result_relation_info = cfstate->resultRelInfo;
- cfstate->estate->es_range_table = cfstate->cstate->range_table;
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+ estate->es_range_table = cstate->range_table;
/* Set up a tuple slot too */
- cfstate->myslot = ExecInitExtraTupleSlot(cfstate->estate);
- ExecSetSlotDescriptor(cfstate->myslot, cfstate->tupDesc);
+ myslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(myslot, tupDesc);
/* Triggers might need a slot as well */
- cfstate->estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(cfstate->estate);
+ estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
/*
* It's more efficient to prepare a bunch of tuples for insertion, and
@@ -5652,21 +5618,21 @@ ParallelCopyFrom(CopyState cstate)
* BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
* expressions. Such triggers or expressions might query the table we're
* inserting to, and act differently if the tuples that have already been
- * cfstate->processed and prepared for insertion are not there. We also can't do
+ * processed and prepared for insertion are not there. We also can't do
* it if the table is partitioned.
*/
- if ((cfstate->resultRelInfo->ri_TrigDesc != NULL &&
- (cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
- cfstate->resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
- cfstate->cstate->partition_dispatch_info != NULL ||
- cfstate->cstate->volatile_defexprs)
+ if ((resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cstate->partition_dispatch_info != NULL ||
+ cstate->volatile_defexprs)
{
- cfstate->useHeapMultiInsert = false;
+ useHeapMultiInsert = false;
}
else
{
- cfstate->useHeapMultiInsert = true;
- cfstate->bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+ useHeapMultiInsert = true;
+ bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
}
/* Prepare to catch AFTER triggers. */
@@ -5678,19 +5644,19 @@ ParallelCopyFrom(CopyState cstate)
* such. However, executing these triggers maintains consistency with the
* EACH ROW triggers that we already fire on COPY.
*/
- ExecBSInsertTriggers(cfstate->estate, cfstate->resultRelInfo);
+ ExecBSInsertTriggers(estate, resultRelInfo);
- cfstate->values = (Datum *) palloc(cfstate->tupDesc->natts * sizeof(Datum));
- cfstate->nulls = (bool *) palloc(cfstate->tupDesc->natts * sizeof(bool));
+ values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
- cfstate->bistate = GetBulkInsertState();
- cfstate->econtext = GetPerTupleExprContext(cfstate->estate);
+ bistate = GetBulkInsertState();
+ econtext = GetPerTupleExprContext(estate);
/* Set up callback to identify error line number */
- cfstate->errcallback.callback = CopyFromErrorCallback;
- cfstate->errcallback.arg = (void *) cfstate->cstate;
- cfstate->errcallback.previous = error_context_stack;
- error_context_stack = &cfstate->errcallback;
+ errcallback.callback = CopyFromErrorCallback;
+ errcallback.arg = (void *) cstate;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
// // BG Worker startup
// RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
@@ -5707,31 +5673,37 @@ ParallelCopyFrom(CopyState cstate)
CHECK_FOR_INTERRUPTS();
- if (cfstate->nBufferedTuples == 0)
+ if (nBufferedTuples == 0)
{
/*
* Reset the per-tuple exprcontext. We can only do this if the
* tuple buffer is empty. (Calling the context the per-tuple
* memory context is a bit of a misnomer now.)
*/
- ResetPerTupleExprContext(cfstate->estate);
+ ResetPerTupleExprContext(estate);
}
/* Switch into its memory context */
- MemoryContextSwitchTo(GetPerTupleMemoryContext(cfstate->estate));
+ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- next_cf_state = NextCopyFrom(cfstate->cstate, cfstate->econtext, cfstate->values, cfstate->nulls, &loaded_oid);
+ next_cf_state = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid);
- LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
- // SpinLockAcquire(&pst->mutex);
- pst->curr_line++;
- // SpinLockRelease(&pst->mutex);
- LWLockRelease(CopyFromBgwLock);
+ // LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ // // SpinLockAcquire(&pst->mutex);
+ // pst->curr_line++;
+ // // SpinLockRelease(&pst->mutex);
+ // LWLockRelease(CopyFromBgwLock);
if (next_cf_state) {
- message = pst->curr_line;
- shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], message_size, &message, false);
+ // message = pst->curr_line;
+ // cur_ptr = cstate->line_buf.data;
+ // line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
+ message = (char *) palloc(cstate->line_buf.len);
+ memcpy(message, cstate->line_buf.data, cstate->line_buf.len);
+ // shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], message_size, &message, false);
+ elog(LOG, "Sending line #%d '%s' to BGWorker #%d", cstate->cur_lineno, message, last_worker_used + 1);
+ shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, message, false);
if (shmq_res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -5746,26 +5718,26 @@ ParallelCopyFrom(CopyState cstate)
else if (next_cf_state == NCF_SUCCESS)
{
/* And now we can form the input tuple. */
- cfstate->tuple = heap_form_tuple(cfstate->tupDesc, cfstate->values, cfstate->nulls);
+ tuple = heap_form_tuple(tupDesc, values, nulls);
if (loaded_oid != InvalidOid)
- HeapTupleSetOid(cfstate->tuple, loaded_oid);
+ HeapTupleSetOid(tuple, loaded_oid);
/*
* Constraints might reference the tableoid column, so initialize
* t_tableOid before evaluating them.
*/
- cfstate->tuple->t_tableOid = RelationGetRelid(cfstate->resultRelInfo->ri_RelationDesc);
+ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
/* Triggers and stuff need to be invoked in query context. */
- MemoryContextSwitchTo(cfstate->oldcontext);
+ MemoryContextSwitchTo(oldcontext);
/* Place tuple in tuple slot --- but slot shouldn't free it */
- slot = cfstate->myslot;
- ExecStoreTuple(cfstate->tuple, slot, InvalidBuffer, false);
+ slot = myslot;
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
/* Determine the partition to heap_insert the tuple into */
- if (cfstate->cstate->partition_dispatch_info)
+ if (cstate->partition_dispatch_info)
{
int leaf_part_index;
TupleConversionMap *map;
@@ -5778,33 +5750,33 @@ ParallelCopyFrom(CopyState cstate)
* will get us the ResultRelInfo and TupleConversionMap for the
* partition, respectively.
*/
- leaf_part_index = ExecFindPartition(cfstate->resultRelInfo,
- cfstate->cstate->partition_dispatch_info,
+ leaf_part_index = ExecFindPartition(resultRelInfo,
+ cstate->partition_dispatch_info,
slot,
- cfstate->estate);
+ estate);
Assert(leaf_part_index >= 0 &&
- leaf_part_index < cfstate->cstate->num_partitions);
+ leaf_part_index < cstate->num_partitions);
/*
* If this tuple is mapped to a partition that is not same as the
* previous one, we'd better make the bulk insert mechanism gets a
* new buffer.
*/
- if (cfstate->prev_leaf_part_index != leaf_part_index)
+ if (prev_leaf_part_index != leaf_part_index)
{
- ReleaseBulkInsertStatePin(cfstate->bistate);
- cfstate->prev_leaf_part_index = leaf_part_index;
+ ReleaseBulkInsertStatePin(bistate);
+ prev_leaf_part_index = leaf_part_index;
}
/*
* Save the old ResultRelInfo and switch to the one corresponding
* to the selected partition.
*/
- cfstate->saved_resultRelInfo = cfstate->resultRelInfo;
- cfstate->resultRelInfo = cfstate->cstate->partitions + leaf_part_index;
+ saved_resultRelInfo = resultRelInfo;
+ resultRelInfo = cstate->partitions + leaf_part_index;
/* We do not yet have a way to insert into a foreign partition */
- if (cfstate->resultRelInfo->ri_FdwRoutine)
+ if (resultRelInfo->ri_FdwRoutine)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot route inserted tuples to a foreign table")));
@@ -5812,26 +5784,26 @@ ParallelCopyFrom(CopyState cstate)
/*
* For ExecInsertIndexTuples() to work on the partition's indexes
*/
- cfstate->estate->es_result_relation_info = cfstate->resultRelInfo;
+ estate->es_result_relation_info = resultRelInfo;
/*
* If we're capturing transition tuples, we might need to convert
* from the partition rowtype to parent rowtype.
*/
- if (cfstate->cstate->transition_capture != NULL)
+ if (cstate->transition_capture != NULL)
{
- if (cfstate->resultRelInfo->ri_TrigDesc &&
- (cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
- cfstate->resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+ if (resultRelInfo->ri_TrigDesc &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
{
/*
* If there are any BEFORE or INSTEAD triggers on the
* partition, we'll have to be ready to convert their
* result back to tuplestore format.
*/
- cfstate->cstate->transition_capture->tcs_original_insert_tuple = NULL;
- cfstate->cstate->transition_capture->tcs_map =
- cfstate->cstate->transition_tupconv_maps[leaf_part_index];
+ cstate->transition_capture->tcs_original_insert_tuple = NULL;
+ cstate->transition_capture->tcs_map =
+ cstate->transition_tupconv_maps[leaf_part_index];
}
else
{
@@ -5839,47 +5811,47 @@ ParallelCopyFrom(CopyState cstate)
* Otherwise, just remember the original unconverted
* tuple, to avoid a needless round trip conversion.
*/
- cfstate->cstate->transition_capture->tcs_original_insert_tuple = cfstate->tuple;
- cfstate->cstate->transition_capture->tcs_map = NULL;
+ cstate->transition_capture->tcs_original_insert_tuple = tuple;
+ cstate->transition_capture->tcs_map = NULL;
}
}
/*
* We might need to convert from the parent rowtype to the
* partition rowtype.
*/
- map = cfstate->cstate->partition_tupconv_maps[leaf_part_index];
+ map = cstate->partition_tupconv_maps[leaf_part_index];
if (map)
{
- Relation partrel = cfstate->resultRelInfo->ri_RelationDesc;
+ Relation partrel = resultRelInfo->ri_RelationDesc;
- cfstate->tuple = do_convert_tuple(cfstate->tuple, map);
+ tuple = do_convert_tuple(tuple, map);
/*
* We must use the partition's tuple descriptor from this
* point on. Use a dedicated slot from this point on until
* we're finished dealing with the partition.
*/
- slot = cfstate->cstate->partition_tuple_slot;
+ slot = cstate->partition_tuple_slot;
Assert(slot != NULL);
ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
- ExecStoreTuple(cfstate->tuple, slot, InvalidBuffer, true);
+ ExecStoreTuple(tuple, slot, InvalidBuffer, true);
}
- cfstate->tuple->t_tableOid = RelationGetRelid(cfstate->resultRelInfo->ri_RelationDesc);
+ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
- if (cfstate->resultRelInfo->ri_TrigDesc &&
- cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row)
{
- slot = ExecBRInsertTriggers(cfstate->estate, cfstate->resultRelInfo, slot);
+ slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
if (slot == NULL) /* "do nothing" */
skip_tuple = true;
else /* trigger might have changed tuple */
- cfstate->tuple = ExecMaterializeSlot(slot);
+ tuple = ExecMaterializeSlot(slot);
}
}
else
@@ -5889,11 +5861,11 @@ ParallelCopyFrom(CopyState cstate)
if (!skip_tuple)
{
- if (cfstate->resultRelInfo->ri_TrigDesc &&
- cfstate->resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
{
/* Pass the data to the INSTEAD ROW INSERT trigger */
- ExecIRInsertTriggers(cfstate->estate, cfstate->resultRelInfo, slot);
+ ExecIRInsertTriggers(estate, resultRelInfo, slot);
}
else
{
@@ -5906,24 +5878,24 @@ ParallelCopyFrom(CopyState cstate)
* satisfied, so we need to check in that case.
*/
bool check_partition_constr =
- (cfstate->resultRelInfo->ri_PartitionCheck != NIL);
+ (resultRelInfo->ri_PartitionCheck != NIL);
- if (cfstate->saved_resultRelInfo != NULL &&
- !(cfstate->resultRelInfo->ri_TrigDesc &&
- cfstate->resultRelInfo->ri_TrigDesc->trig_insert_before_row))
+ if (saved_resultRelInfo != NULL &&
+ !(resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row))
check_partition_constr = false;
/* Check the constraints of the tuple */
- if (cfstate->cstate->rel->rd_att->constr || check_partition_constr)
- ExecConstraints(cfstate->resultRelInfo, slot, cfstate->estate);
+ if (cstate->rel->rd_att->constr || check_partition_constr)
+ ExecConstraints(resultRelInfo, slot, estate);
- if (cfstate->useHeapMultiInsert)
+ if (useHeapMultiInsert)
{
/* Add this tuple to the tuple buffer */
- if (cfstate->nBufferedTuples == 0)
- cfstate->firstBufferedLineNo = cfstate->cstate->cur_lineno;
- cfstate->bufferedTuples[cfstate->nBufferedTuples++] = cfstate->tuple;
- cfstate->bufferedTuplesSize += cfstate->tuple->t_len;
+ if (nBufferedTuples == 0)
+ firstBufferedLineNo = cstate->cur_lineno;
+ bufferedTuples[nBufferedTuples++] = tuple;
+ bufferedTuplesSize += tuple->t_len;
/*
* If the buffer filled up, flush it. Also flush if the
@@ -5931,15 +5903,15 @@ ParallelCopyFrom(CopyState cstate)
* large, to avoid using large amounts of memory for the
* buffer when the tuples are exceptionally wide.
*/
- if (cfstate->nBufferedTuples == MAX_BUFFERED_TUPLES ||
- cfstate->bufferedTuplesSize > 65535)
+ if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+ bufferedTuplesSize > 65535)
{
- CopyFromInsertBatch(cfstate->cstate, cfstate->estate, cfstate->mycid, cfstate->hi_options,
- cfstate->resultRelInfo, cfstate->myslot, cfstate->bistate,
- cfstate->nBufferedTuples, cfstate->bufferedTuples,
- cfstate->firstBufferedLineNo);
- cfstate->nBufferedTuples = 0;
- cfstate->bufferedTuplesSize = 0;
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples,
+ firstBufferedLineNo);
+ nBufferedTuples = 0;
+ bufferedTuplesSize = 0;
}
}
else
@@ -5947,20 +5919,20 @@ ParallelCopyFrom(CopyState cstate)
List *recheckIndexes = NIL;
/* OK, store the tuple and create index entries for it */
- heap_insert(cfstate->resultRelInfo->ri_RelationDesc, cfstate->tuple, cfstate->mycid,
- cfstate->hi_options, cfstate->bistate);
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+ hi_options, bistate);
- if (cfstate->resultRelInfo->ri_NumIndices > 0)
+ if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot,
- &(cfstate->tuple->t_self),
- cfstate->estate,
+ &(tuple->t_self),
+ estate,
false,
NULL,
NIL);
/* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(cfstate->estate, cfstate->resultRelInfo, cfstate->tuple,
- recheckIndexes, cfstate->cstate->transition_capture);
+ ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ recheckIndexes, cstate->transition_capture);
list_free(recheckIndexes);
}
@@ -5971,94 +5943,94 @@ ParallelCopyFrom(CopyState cstate)
* this is the same definition used by execMain.c for counting
* tuples inserted by an INSERT command.
*/
- cfstate->processed++;
+ processed++;
- if (cfstate->saved_resultRelInfo)
+ if (saved_resultRelInfo)
{
- cfstate->resultRelInfo = cfstate->saved_resultRelInfo;
- cfstate->estate->es_result_relation_info = cfstate->resultRelInfo;
+ resultRelInfo = saved_resultRelInfo;
+ estate->es_result_relation_info = resultRelInfo;
}
}
}
/* Flush any remaining buffered tuples */
- if (cfstate->nBufferedTuples > 0)
- CopyFromInsertBatch(cfstate->cstate, cfstate->estate, cfstate->mycid, cfstate->hi_options,
- cfstate->resultRelInfo, cfstate->myslot, cfstate->bistate,
- cfstate->nBufferedTuples, cfstate->bufferedTuples,
- cfstate->firstBufferedLineNo);
+ if (nBufferedTuples > 0)
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples,
+ firstBufferedLineNo);
/* Done, clean up */
- error_context_stack = cfstate->errcallback.previous;
+ error_context_stack = errcallback.previous;
- FreeBulkInsertState(cfstate->bistate);
+ FreeBulkInsertState(bistate);
- MemoryContextSwitchTo(cfstate->oldcontext);
+ MemoryContextSwitchTo(oldcontext);
/*
* In the old protocol, tell pqcomm that we can process normal protocol
* messages again.
*/
- if (cfstate->cstate->copy_dest == COPY_OLD_FE)
+ if (cstate->copy_dest == COPY_OLD_FE)
pq_endmsgread();
/* Execute AFTER STATEMENT insertion triggers */
- ExecASInsertTriggers(cfstate->estate, cfstate->resultRelInfo, cfstate->cstate->transition_capture);
+ ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
/* Handle queued AFTER triggers */
- AfterTriggerEndQuery(cfstate->estate);
+ AfterTriggerEndQuery(estate);
- pfree(cfstate->values);
- pfree(cfstate->nulls);
+ pfree(values);
+ pfree(nulls);
- ExecResetTupleTable(cfstate->estate->es_tupleTable, false);
+ ExecResetTupleTable(estate->es_tupleTable, false);
- ExecCloseIndices(cfstate->resultRelInfo);
+ ExecCloseIndices(resultRelInfo);
/* Close all the partitioned tables, leaf partitions, and their indices */
- if (cfstate->cstate->partition_dispatch_info)
+ if (cstate->partition_dispatch_info)
{
int i;
/*
- * Remember cfstate->cstate->partition_dispatch_info[0] corresponds to the root
+ * Remember cstate->partition_dispatch_info[0] corresponds to the root
* partitioned table, which we must not try to close, because it is
* the main target table of COPY that will be closed eventually by
* DoCopy(). Also, tupslot is NULL for the root partitioned table.
*/
- for (i = 1; i < cfstate->cstate->num_dispatch; i++)
+ for (i = 1; i < cstate->num_dispatch; i++)
{
- PartitionDispatch pd = cfstate->cstate->partition_dispatch_info[i];
+ PartitionDispatch pd = cstate->partition_dispatch_info[i];
heap_close(pd->reldesc, NoLock);
ExecDropSingleTupleTableSlot(pd->tupslot);
}
- for (i = 0; i < cfstate->cstate->num_partitions; i++)
+ for (i = 0; i < cstate->num_partitions; i++)
{
- ResultRelInfo *resultRelInfo = cfstate->cstate->partitions + i;
+ ResultRelInfo *resultRelInfo = cstate->partitions + i;
ExecCloseIndices(resultRelInfo);
heap_close(resultRelInfo->ri_RelationDesc, NoLock);
}
/* Release the standalone partition tuple descriptor */
- ExecDropSingleTupleTableSlot(cfstate->cstate->partition_tuple_slot);
+ ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
}
/* Close any trigger target relations */
- ExecCleanUpTriggerState(cfstate->estate);
+ ExecCleanUpTriggerState(estate);
- FreeExecutorState(cfstate->estate);
+ FreeExecutorState(estate);
/*
* If we skipped writing WAL, then we need to sync the heap (but not
* indexes since those use WAL anyway)
*/
- if (cfstate->hi_options & HEAP_INSERT_SKIP_WAL)
- heap_sync(cfstate->cstate->rel);
+ if (hi_options & HEAP_INSERT_SKIP_WAL)
+ heap_sync(cstate->rel);
/* Clean up. */
dsm_detach(seg);
- return cfstate->processed;
+ return processed;
}
--
2.11.0
From fbd7dcfd7e46e0f58f2b197d3b5f9e89a5cd449f Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Tue, 1 Aug 2017 15:09:37 +0300
Subject: [PATCH 07/13] Stop workers on EOF instead of on_dsm_detach callback
---
src/backend/commands/copy.c | 595 ++++++++++++++++++++++++--------------------
1 file changed, 319 insertions(+), 276 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index b03fbacfa1..22446d75e8 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -226,7 +226,7 @@ typedef struct CopyStateData
/*
* Finally, raw_buf holds raw data read from the data source (file or
- * client connection). CopyReadLine parses this data sufficiently to
+ * client connection). parses this data sufficiently to
* locate line boundaries, then transfers the data to line_buf and
* converts it. Note: we guarantee that there is a \0 at
* raw_buf[raw_buf_len].
@@ -5113,6 +5113,11 @@ CopyFromBgwMainLoop(Datum main_arg)
msg = (char *) data;
if (shmq_res != SHM_MQ_SUCCESS)
break;
+ if (len == 0)
+ {
+ elog(LOG, "BGWorker #%d: got zero-length message, stopping", myworkernumber);
+ break;
+ }
// elog(LOG, "BGWorker #%d dummy processing line #%ld", myworkernumber, *(int64 *) data);
elog(LOG, "BGWorker #%d dummy processing line: %s", myworkernumber, msg);
}
@@ -5185,9 +5190,9 @@ shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
* kill them if we die; they'll die on their own as the message queues
* shut down.
*/
- // cancel_on_dsm_detach(seg, cleanup_background_workers,
- // PointerGetDatum(wstate));
- // pfree(wstate);
+ cancel_on_dsm_detach(seg, cleanup_background_workers,
+ PointerGetDatum(wstate));
+ pfree(wstate);
return pst;
}
@@ -5666,27 +5671,28 @@ ParallelCopyFrom(CopyState cstate)
for (;;)
{
- TupleTableSlot *slot;
- bool skip_tuple;
- Oid loaded_oid = InvalidOid;
- int next_cf_state; /* NextCopyFrom return state */
+ // TupleTableSlot *slot;
+ // bool skip_tuple;
+ // Oid loaded_oid = InvalidOid;
+ // int next_cf_state; /* NextCopyFrom return state */
+ bool done;
CHECK_FOR_INTERRUPTS();
- if (nBufferedTuples == 0)
- {
- /*
- * Reset the per-tuple exprcontext. We can only do this if the
- * tuple buffer is empty. (Calling the context the per-tuple
- * memory context is a bit of a misnomer now.)
- */
- ResetPerTupleExprContext(estate);
- }
+ // if (nBufferedTuples == 0)
+ // {
+ // /*
+ // * Reset the per-tuple exprcontext. We can only do this if the
+ // * tuple buffer is empty. (Calling the context the per-tuple
+ // * memory context is a bit of a misnomer now.)
+ // */
+ // ResetPerTupleExprContext(estate);
+ // }
/* Switch into its memory context */
- MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ // MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- next_cf_state = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid);
+ // next_cf_state = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid);
// LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
@@ -5695,15 +5701,52 @@ ParallelCopyFrom(CopyState cstate)
// // SpinLockRelease(&pst->mutex);
// LWLockRelease(CopyFromBgwLock);
- if (next_cf_state) {
+ /* on input just throw the header line away */
+ if (cstate->cur_lineno == 0 && cstate->header_line)
+ {
+ cstate->cur_lineno++;
+ if (CopyReadLine(cstate))
+ continue; /* done */
+ }
+
+ cstate->cur_lineno++;
+
+ /* Actually read the line into memory here */
+ done = CopyReadLine(cstate);
+ // elog(LOG, "Reading line #%d with status %d", cstate->cur_lineno, done);
+
+ /*
+ * EOF at start of line means we're done. If we see EOF after some
+ * characters, we act as though it was newline followed by EOF, ie,
+ * process the line and then exit loop on next iteration.
+ */
+ if (done && cstate->line_buf.len == 0)
+ {
+ int i;
+
+ elog(LOG, "EOF reached, ending up queries");
+ for (i = 0; i < nworkers; i++)
+ {
+ /*
+ * Sending zero-length data to workers in order to stop them.
+ */
+ shm_mq_send(mq_handles[i], 0, cstate->line_buf.data, false);
+ }
+
+ break;
+ }
+ else
+ {
// message = pst->curr_line;
// cur_ptr = cstate->line_buf.data;
// line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
- message = (char *) palloc(cstate->line_buf.len);
- memcpy(message, cstate->line_buf.data, cstate->line_buf.len);
+ // message = (char *) palloc(cstate->line_buf.len);
+ // memcpy(message, cstate->line_buf.data, cstate->line_buf.len);
// shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], message_size, &message, false);
- elog(LOG, "Sending line #%d '%s' to BGWorker #%d", cstate->cur_lineno, message, last_worker_used + 1);
- shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, message, false);
+ // elog(LOG, "Sending line #%d '%s' to BGWorker #%d", cstate->cur_lineno, message, last_worker_used + 1);
+ elog(LOG, "Sending line #%d to BGWorker #%d", cstate->cur_lineno, last_worker_used + 1);
+ // shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, message, false);
+ shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, cstate->line_buf.data, false);
if (shmq_res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -5712,258 +5755,258 @@ ParallelCopyFrom(CopyState cstate)
last_worker_used = 0;
}
- if (!next_cf_state) {
- break;
- }
- else if (next_cf_state == NCF_SUCCESS)
- {
- /* And now we can form the input tuple. */
- tuple = heap_form_tuple(tupDesc, values, nulls);
-
- if (loaded_oid != InvalidOid)
- HeapTupleSetOid(tuple, loaded_oid);
-
- /*
- * Constraints might reference the tableoid column, so initialize
- * t_tableOid before evaluating them.
- */
- tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
-
- /* Triggers and stuff need to be invoked in query context. */
- MemoryContextSwitchTo(oldcontext);
-
- /* Place tuple in tuple slot --- but slot shouldn't free it */
- slot = myslot;
- ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-
- /* Determine the partition to heap_insert the tuple into */
- if (cstate->partition_dispatch_info)
- {
- int leaf_part_index;
- TupleConversionMap *map;
-
- /*
- * Away we go ... If we end up not finding a partition after all,
- * ExecFindPartition() does not return and errors out instead.
- * Otherwise, the returned value is to be used as an index into
- * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
- * will get us the ResultRelInfo and TupleConversionMap for the
- * partition, respectively.
- */
- leaf_part_index = ExecFindPartition(resultRelInfo,
- cstate->partition_dispatch_info,
- slot,
- estate);
- Assert(leaf_part_index >= 0 &&
- leaf_part_index < cstate->num_partitions);
-
- /*
- * If this tuple is mapped to a partition that is not same as the
- * previous one, we'd better make the bulk insert mechanism gets a
- * new buffer.
- */
- if (prev_leaf_part_index != leaf_part_index)
- {
- ReleaseBulkInsertStatePin(bistate);
- prev_leaf_part_index = leaf_part_index;
- }
-
- /*
- * Save the old ResultRelInfo and switch to the one corresponding
- * to the selected partition.
- */
- saved_resultRelInfo = resultRelInfo;
- resultRelInfo = cstate->partitions + leaf_part_index;
-
- /* We do not yet have a way to insert into a foreign partition */
- if (resultRelInfo->ri_FdwRoutine)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot route inserted tuples to a foreign table")));
-
- /*
- * For ExecInsertIndexTuples() to work on the partition's indexes
- */
- estate->es_result_relation_info = resultRelInfo;
-
- /*
- * If we're capturing transition tuples, we might need to convert
- * from the partition rowtype to parent rowtype.
- */
- if (cstate->transition_capture != NULL)
- {
- if (resultRelInfo->ri_TrigDesc &&
- (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
- {
- /*
- * If there are any BEFORE or INSTEAD triggers on the
- * partition, we'll have to be ready to convert their
- * result back to tuplestore format.
- */
- cstate->transition_capture->tcs_original_insert_tuple = NULL;
- cstate->transition_capture->tcs_map =
- cstate->transition_tupconv_maps[leaf_part_index];
- }
- else
- {
- /*
- * Otherwise, just remember the original unconverted
- * tuple, to avoid a needless round trip conversion.
- */
- cstate->transition_capture->tcs_original_insert_tuple = tuple;
- cstate->transition_capture->tcs_map = NULL;
- }
- }
- /*
- * We might need to convert from the parent rowtype to the
- * partition rowtype.
- */
- map = cstate->partition_tupconv_maps[leaf_part_index];
- if (map)
- {
- Relation partrel = resultRelInfo->ri_RelationDesc;
-
- tuple = do_convert_tuple(tuple, map);
-
- /*
- * We must use the partition's tuple descriptor from this
- * point on. Use a dedicated slot from this point on until
- * we're finished dealing with the partition.
- */
- slot = cstate->partition_tuple_slot;
- Assert(slot != NULL);
- ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
- ExecStoreTuple(tuple, slot, InvalidBuffer, true);
- }
-
- tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
- }
-
- skip_tuple = false;
-
- /* BEFORE ROW INSERT Triggers */
- if (resultRelInfo->ri_TrigDesc &&
- resultRelInfo->ri_TrigDesc->trig_insert_before_row)
- {
- slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
-
- if (slot == NULL) /* "do nothing" */
- skip_tuple = true;
- else /* trigger might have changed tuple */
- tuple = ExecMaterializeSlot(slot);
- }
- }
- else
- {
- skip_tuple = true;
- }
-
- if (!skip_tuple)
- {
- if (resultRelInfo->ri_TrigDesc &&
- resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
- {
- /* Pass the data to the INSTEAD ROW INSERT trigger */
- ExecIRInsertTriggers(estate, resultRelInfo, slot);
- }
- else
- {
- /*
- * We always check the partition constraint, including when
- * the tuple got here via tuple-routing. However we don't
- * need to in the latter case if no BR trigger is defined on
- * the partition. Note that a BR trigger might modify the
- * tuple such that the partition constraint is no longer
- * satisfied, so we need to check in that case.
- */
- bool check_partition_constr =
- (resultRelInfo->ri_PartitionCheck != NIL);
-
- if (saved_resultRelInfo != NULL &&
- !(resultRelInfo->ri_TrigDesc &&
- resultRelInfo->ri_TrigDesc->trig_insert_before_row))
- check_partition_constr = false;
-
- /* Check the constraints of the tuple */
- if (cstate->rel->rd_att->constr || check_partition_constr)
- ExecConstraints(resultRelInfo, slot, estate);
-
- if (useHeapMultiInsert)
- {
- /* Add this tuple to the tuple buffer */
- if (nBufferedTuples == 0)
- firstBufferedLineNo = cstate->cur_lineno;
- bufferedTuples[nBufferedTuples++] = tuple;
- bufferedTuplesSize += tuple->t_len;
-
- /*
- * If the buffer filled up, flush it. Also flush if the
- * total size of all the tuples in the buffer becomes
- * large, to avoid using large amounts of memory for the
- * buffer when the tuples are exceptionally wide.
- */
- if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
- bufferedTuplesSize > 65535)
- {
- CopyFromInsertBatch(cstate, estate, mycid, hi_options,
- resultRelInfo, myslot, bistate,
- nBufferedTuples, bufferedTuples,
- firstBufferedLineNo);
- nBufferedTuples = 0;
- bufferedTuplesSize = 0;
- }
- }
- else
- {
- List *recheckIndexes = NIL;
-
- /* OK, store the tuple and create index entries for it */
- heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
- hi_options, bistate);
-
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot,
- &(tuple->t_self),
- estate,
- false,
- NULL,
- NIL);
-
- /* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(estate, resultRelInfo, tuple,
- recheckIndexes, cstate->transition_capture);
-
- list_free(recheckIndexes);
- }
- }
-
- /*
- * We count only tuples not suppressed by a BEFORE INSERT trigger;
- * this is the same definition used by execMain.c for counting
- * tuples inserted by an INSERT command.
- */
- processed++;
-
- if (saved_resultRelInfo)
- {
- resultRelInfo = saved_resultRelInfo;
- estate->es_result_relation_info = resultRelInfo;
- }
- }
- }
-
- /* Flush any remaining buffered tuples */
- if (nBufferedTuples > 0)
- CopyFromInsertBatch(cstate, estate, mycid, hi_options,
- resultRelInfo, myslot, bistate,
- nBufferedTuples, bufferedTuples,
- firstBufferedLineNo);
-
- /* Done, clean up */
- error_context_stack = errcallback.previous;
-
- FreeBulkInsertState(bistate);
+ // if (!next_cf_state) {
+ // break;
+ // }
+ // else if (next_cf_state == NCF_SUCCESS)
+ // {
+ // /* And now we can form the input tuple. */
+ // tuple = heap_form_tuple(tupDesc, values, nulls);
+ //
+ // if (loaded_oid != InvalidOid)
+ // HeapTupleSetOid(tuple, loaded_oid);
+ //
+ // /*
+ // * Constraints might reference the tableoid column, so initialize
+ // * t_tableOid before evaluating them.
+ // */
+ // tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ //
+ // /* Triggers and stuff need to be invoked in query context. */
+ // MemoryContextSwitchTo(oldcontext);
+ //
+ // /* Place tuple in tuple slot --- but slot shouldn't free it */
+ // slot = myslot;
+ // ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+ //
+ // /* Determine the partition to heap_insert the tuple into */
+ // if (cstate->partition_dispatch_info)
+ // {
+ // int leaf_part_index;
+ // TupleConversionMap *map;
+ //
+ // /*
+ // * Away we go ... If we end up not finding a partition after all,
+ // * ExecFindPartition() does not return and errors out instead.
+ // * Otherwise, the returned value is to be used as an index into
+ // * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+ // * will get us the ResultRelInfo and TupleConversionMap for the
+ // * partition, respectively.
+ // */
+ // leaf_part_index = ExecFindPartition(resultRelInfo,
+ // cstate->partition_dispatch_info,
+ // slot,
+ // estate);
+ // Assert(leaf_part_index >= 0 &&
+ // leaf_part_index < cstate->num_partitions);
+ //
+ // /*
+ // * If this tuple is mapped to a partition that is not same as the
+ // * previous one, we'd better make the bulk insert mechanism gets a
+ // * new buffer.
+ // */
+ // if (prev_leaf_part_index != leaf_part_index)
+ // {
+ // ReleaseBulkInsertStatePin(bistate);
+ // prev_leaf_part_index = leaf_part_index;
+ // }
+ //
+ // /*
+ // * Save the old ResultRelInfo and switch to the one corresponding
+ // * to the selected partition.
+ // */
+ // saved_resultRelInfo = resultRelInfo;
+ // resultRelInfo = cstate->partitions + leaf_part_index;
+ //
+ // /* We do not yet have a way to insert into a foreign partition */
+ // if (resultRelInfo->ri_FdwRoutine)
+ // ereport(ERROR,
+ // (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ // errmsg("cannot route inserted tuples to a foreign table")));
+ //
+ // /*
+ // * For ExecInsertIndexTuples() to work on the partition's indexes
+ // */
+ // estate->es_result_relation_info = resultRelInfo;
+ //
+ // /*
+ // * If we're capturing transition tuples, we might need to convert
+ // * from the partition rowtype to parent rowtype.
+ // */
+ // if (cstate->transition_capture != NULL)
+ // {
+ // if (resultRelInfo->ri_TrigDesc &&
+ // (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ // resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+ // {
+ // /*
+ // * If there are any BEFORE or INSTEAD triggers on the
+ // * partition, we'll have to be ready to convert their
+ // * result back to tuplestore format.
+ // */
+ // cstate->transition_capture->tcs_original_insert_tuple = NULL;
+ // cstate->transition_capture->tcs_map =
+ // cstate->transition_tupconv_maps[leaf_part_index];
+ // }
+ // else
+ // {
+ // /*
+ // * Otherwise, just remember the original unconverted
+ // * tuple, to avoid a needless round trip conversion.
+ // */
+ // cstate->transition_capture->tcs_original_insert_tuple = tuple;
+ // cstate->transition_capture->tcs_map = NULL;
+ // }
+ // }
+ // /*
+ // * We might need to convert from the parent rowtype to the
+ // * partition rowtype.
+ // */
+ // map = cstate->partition_tupconv_maps[leaf_part_index];
+ // if (map)
+ // {
+ // Relation partrel = resultRelInfo->ri_RelationDesc;
+ //
+ // tuple = do_convert_tuple(tuple, map);
+ //
+ // /*
+ // * We must use the partition's tuple descriptor from this
+ // * point on. Use a dedicated slot from this point on until
+ // * we're finished dealing with the partition.
+ // */
+ // slot = cstate->partition_tuple_slot;
+ // Assert(slot != NULL);
+ // ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+ // ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+ // }
+ //
+ // tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ // }
+ //
+ // skip_tuple = false;
+ //
+ // /* BEFORE ROW INSERT Triggers */
+ // if (resultRelInfo->ri_TrigDesc &&
+ // resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+ // {
+ // slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
+ //
+ // if (slot == NULL) /* "do nothing" */
+ // skip_tuple = true;
+ // else /* trigger might have changed tuple */
+ // tuple = ExecMaterializeSlot(slot);
+ // }
+ // }
+ // else
+ // {
+ // skip_tuple = true;
+ // }
+ //
+ // if (!skip_tuple)
+ // {
+ // if (resultRelInfo->ri_TrigDesc &&
+ // resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+ // {
+ // /* Pass the data to the INSTEAD ROW INSERT trigger */
+ // ExecIRInsertTriggers(estate, resultRelInfo, slot);
+ // }
+ // else
+ // {
+ // /*
+ // * We always check the partition constraint, including when
+ // * the tuple got here via tuple-routing. However we don't
+ // * need to in the latter case if no BR trigger is defined on
+ // * the partition. Note that a BR trigger might modify the
+ // * tuple such that the partition constraint is no longer
+ // * satisfied, so we need to check in that case.
+ // */
+ // bool check_partition_constr =
+ // (resultRelInfo->ri_PartitionCheck != NIL);
+ //
+ // if (saved_resultRelInfo != NULL &&
+ // !(resultRelInfo->ri_TrigDesc &&
+ // resultRelInfo->ri_TrigDesc->trig_insert_before_row))
+ // check_partition_constr = false;
+ //
+ // /* Check the constraints of the tuple */
+ // if (cstate->rel->rd_att->constr || check_partition_constr)
+ // ExecConstraints(resultRelInfo, slot, estate);
+ //
+ // if (useHeapMultiInsert)
+ // {
+ // /* Add this tuple to the tuple buffer */
+ // if (nBufferedTuples == 0)
+ // firstBufferedLineNo = cstate->cur_lineno;
+ // bufferedTuples[nBufferedTuples++] = tuple;
+ // bufferedTuplesSize += tuple->t_len;
+ //
+ // /*
+ // * If the buffer filled up, flush it. Also flush if the
+ // * total size of all the tuples in the buffer becomes
+ // * large, to avoid using large amounts of memory for the
+ // * buffer when the tuples are exceptionally wide.
+ // */
+ // if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+ // bufferedTuplesSize > 65535)
+ // {
+ // CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ // resultRelInfo, myslot, bistate,
+ // nBufferedTuples, bufferedTuples,
+ // firstBufferedLineNo);
+ // nBufferedTuples = 0;
+ // bufferedTuplesSize = 0;
+ // }
+ // }
+ // else
+ // {
+ // List *recheckIndexes = NIL;
+ //
+ // /* OK, store the tuple and create index entries for it */
+ // heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+ // hi_options, bistate);
+ //
+ // if (resultRelInfo->ri_NumIndices > 0)
+ // recheckIndexes = ExecInsertIndexTuples(slot,
+ // &(tuple->t_self),
+ // estate,
+ // false,
+ // NULL,
+ // NIL);
+ //
+ // /* AFTER ROW INSERT Triggers */
+ // ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ // recheckIndexes, cstate->transition_capture);
+ //
+ // list_free(recheckIndexes);
+ // }
+ // }
+ //
+ // /*
+ // * We count only tuples not suppressed by a BEFORE INSERT trigger;
+ // * this is the same definition used by execMain.c for counting
+ // * tuples inserted by an INSERT command.
+ // */
+ // processed++;
+ //
+ // if (saved_resultRelInfo)
+ // {
+ // resultRelInfo = saved_resultRelInfo;
+ // estate->es_result_relation_info = resultRelInfo;
+ // }
+ // }
+ }
+ //
+ // /* Flush any remaining buffered tuples */
+ // if (nBufferedTuples > 0)
+ // CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ // resultRelInfo, myslot, bistate,
+ // nBufferedTuples, bufferedTuples,
+ // firstBufferedLineNo);
+ //
+ // /* Done, clean up */
+ // error_context_stack = errcallback.previous;
+ //
+ // FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
--
2.11.0
From 0ba0d2802955fc3bc25304682cdfb42a1ba365de Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Fri, 4 Aug 2017 20:13:47 +0300
Subject: [PATCH 08/13] Use ConditionalVariable to wait until BGWorkers
complete their work
---
src/backend/commands/copy.c | 50 ++++++++++++++++++++++++++++++++++++++++-
src/backend/postmaster/pgstat.c | 3 +++
src/include/pgstat.h | 3 ++-
3 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 22446d75e8..122822e6bf 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -260,8 +260,10 @@ typedef struct
int workers_total;
int workers_attached;
int workers_ready;
+ int workers_finished;
Oid database_id;
Oid authenticated_user_id;
+ ConditionVariable cv;
} ParallelState;
// TODO Consider change
@@ -5008,6 +5010,7 @@ CopyFromBgwMainLoop(Datum main_arg)
shm_mq_result shmq_res;
Size len;
void *data;
+ ConditionVariable cv;
/*
* Establish signal handlers.
@@ -5089,6 +5092,7 @@ CopyFromBgwMainLoop(Datum main_arg)
LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
// SpinLockAcquire(&pst->mutex);
++pst->workers_ready;
+ cv = pst->cv;
elog(LOG, "BGWorker #%d started", myworkernumber);
// SpinLockRelease(&pst->mutex);
LWLockRelease(CopyFromBgwLock);
@@ -5122,6 +5126,13 @@ CopyFromBgwMainLoop(Datum main_arg)
elog(LOG, "BGWorker #%d dummy processing line: %s", myworkernumber, msg);
}
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ ++pst->workers_finished;
+ LWLockRelease(CopyFromBgwLock);
+
+ /* Signal main process that we are done. */
+ ConditionVariableBroadcast(&cv);
+
/*
* We're done. Explicitly detach the shared memory segment so that we
* don't get a resource leak warning at commit time. This will fire any
@@ -5250,12 +5261,14 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
/* Set up the header region. */
pst = shm_toc_allocate(toc, sizeof(ParallelState));
- SpinLockInit(&pst->mutex);
+ // SpinLockInit(&pst->mutex);
pst->workers_total = nworkers;
pst->workers_attached = 0;
pst->workers_ready = 0;
+ pst->workers_finished = 0;
pst->database_id = MyDatabaseId;
pst->authenticated_user_id = GetAuthenticatedUserId();
+ ConditionVariableInit(&pst->cv);
shm_toc_insert(toc, 0, pst);
/* Set up one message queue per worker, plus one. */
@@ -5431,6 +5444,38 @@ check_worker_status(WorkerState *wstate)
return true;
}
+static void
+wait_for_workers_to_finish(volatile ParallelState *pst)
+{
+ elog(LOG, "Waiting for BGWorkers to finish");
+
+ for (;;)
+ {
+ int workers_finished;
+ int workers_total;
+ ConditionVariable cv;
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ workers_finished = pst->workers_finished;
+ workers_total = pst->workers_total;
+ cv = pst->cv;
+ LWLockRelease(CopyFromBgwLock);
+
+ elog(LOG, "Checking BGWorkers workers_finished: %d, workers_total: %d", workers_finished, workers_total);
+ if (workers_finished == workers_total)
+ {
+ break;
+ }
+
+ elog(LOG, "Going to sleep again");
+ /* Wait for the workers to wake us up. */
+ ConditionVariableSleep(&cv, WAIT_EVENT_COPY_FROM_BGWORKERS_FINISHED);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ ConditionVariableCancelSleep();
+}
/*
* Parallel Copy FROM file to relation.
@@ -6008,6 +6053,9 @@ ParallelCopyFrom(CopyState cstate)
//
// FreeBulkInsertState(bistate);
+ /* Wait for all workers to complete their work. */
+ wait_for_workers_to_finish(pst);
+
MemoryContextSwitchTo(oldcontext);
/*
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index a0b0eecbd5..ba1c8eeed1 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3612,6 +3612,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE:
event_name = "LogicalSyncStateChange";
break;
+ case WAIT_EVENT_COPY_FROM_BGWORKERS_FINISHED:
+ event_name = "CopyFromBGWorkersFinished";
+ break;
/* no default case, so that compiler will warn */
}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6bffe63ad6..66f6b2ff2f 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -812,7 +812,8 @@ typedef enum
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP,
WAIT_EVENT_LOGICAL_SYNC_DATA,
- WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
+ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
+ WAIT_EVENT_COPY_FROM_BGWORKERS_FINISHED
} WaitEventIPC;
/* ----------
--
2.11.0
From 353f08dfd225f75099c66dd48109ea31c55adec9 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Tue, 15 Aug 2017 12:18:11 +0300
Subject: [PATCH 09/13] Very very dirty, but working trick to obtain CopyState
in the parallel worker
---
src/backend/commands/copy.c | 1168 ++++++++++++++++++++++++++++++++-----------
src/include/commands/copy.h | 7 +-
2 files changed, 884 insertions(+), 291 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 122822e6bf..7f38e70bf3 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -38,8 +38,10 @@
#include "optimizer/planner.h"
#include "optimizer/cost.h"
#include "nodes/makefuncs.h"
+#include "parser/parser.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
+#include "tcop/pquery.h"
#include "postmaster/bgworker.h"
#include "rewrite/rewriteHandler.h"
#include "storage/dsm.h"
@@ -264,6 +266,21 @@ typedef struct
Oid database_id;
Oid authenticated_user_id;
ConditionVariable cv;
+
+ bool is_program;
+ Relation rel;
+
+ TupleDesc tupDesc;
+ ResultRelInfo *resultRelInfo;
+ EState *estate;
+ ExprContext *econtext;
+ TupleTableSlot *myslot;
+
+ ErrorContextCallback errcallback;
+ CommandId mycid;
+ int hi_options;
+ BulkInsertState bistate;
+ bool useHeapMultiInsert;
} ParallelState;
// TODO Consider change
@@ -389,12 +406,19 @@ static void CopySendInt16(CopyState cstate, int16 val);
static bool CopyGetInt16(CopyState cstate, int16 *val);
-static ParallelState* shm_mq_setup(int64 queue_size, int32 nworkers,
- dsm_segment **segp, shm_mq_handle **mq_handles[]);
-static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
+static ParallelState* setup_parallel_copy_from(int64 queue_size, int32 nworkers,
+ dsm_segment **segp, shm_mq_handle **mq_handles[], CopyState cstate,
+ ParseState *pstate,
+ List *attnamelist,
+ List *options);
+static void setup_dsm(int64 queue_size, int nworkers,
dsm_segment **segp,
ParallelState **pstp,
- shm_mq **mqs[]);
+ shm_mq **mqs[],
+ CopyState cstate,
+ ParseState *pstate,
+ List *attnamelist,
+ List *options);
static WorkerState *setup_background_workers(int nworkers,
dsm_segment *seg);
static void cleanup_background_workers(dsm_segment *seg, Datum arg);
@@ -1041,7 +1065,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
if (cstate->allow_parallel) /* copy from file to database */
{
- *processed = ParallelCopyFrom(cstate);
+ *processed = ParallelCopyFrom(cstate, pstate, rel, stmt->filename, stmt->is_program,
+ stmt->attlist, stmt->options);
}
else
{
@@ -3169,63 +3194,66 @@ BeginCopyFrom(ParseState *pstate,
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
- if (data_source_cb)
- {
- cstate->copy_dest = COPY_CALLBACK;
- cstate->data_source_cb = data_source_cb;
- }
- else if (pipe)
- {
- Assert(!is_program); /* the grammar does not allow this */
- if (whereToSendOutput == DestRemote)
- ReceiveCopyBegin(cstate);
- else
- cstate->copy_file = stdin;
- }
- else
- {
- cstate->filename = pstrdup(filename);
+ if (!IsBackgroundWorker)
+ {
+ if (data_source_cb)
+ {
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_source_cb = data_source_cb;
+ }
+ else if (pipe)
+ {
+ Assert(!is_program); /* the grammar does not allow this */
+ if (whereToSendOutput == DestRemote)
+ ReceiveCopyBegin(cstate);
+ else
+ cstate->copy_file = stdin;
+ }
+ else
+ {
+ cstate->filename = pstrdup(filename);
- if (cstate->is_program)
- {
- cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not execute command \"%s\": %m",
- cstate->filename)));
- }
- else
- {
- struct stat st;
+ if (cstate->is_program)
+ {
+ cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not execute command \"%s\": %m",
+ cstate->filename)));
+ }
+ else
+ {
+ struct stat st;
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
- if (cstate->copy_file == NULL)
- {
- /* copy errno because ereport subfunctions might change it */
- int save_errno = errno;
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+ if (cstate->copy_file == NULL)
+ {
+ /* copy errno because ereport subfunctions might change it */
+ int save_errno = errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for reading: %m",
- cstate->filename),
- (save_errno == ENOENT || save_errno == EACCES) ?
- errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
- "You may want a client-side facility such as psql's \\copy.") : 0));
- }
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for reading: %m",
+ cstate->filename),
+ (save_errno == ENOENT || save_errno == EACCES) ?
+ errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
+ "You may want a client-side facility such as psql's \\copy.") : 0));
+ }
- if (fstat(fileno(cstate->copy_file), &st))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m",
- cstate->filename)));
+ if (fstat(fileno(cstate->copy_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->filename)));
- if (S_ISDIR(st.st_mode))
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
- }
- }
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
+ }
+ }
if (!cstate->binary)
{
@@ -5001,6 +5029,8 @@ void
CopyFromBgwMainLoop(Datum main_arg)
{
volatile ParallelState *pst;
+ // volatile CopyState cst;
+
dsm_segment *seg;
shm_toc *toc;
int myworkernumber;
@@ -5012,6 +5042,23 @@ CopyFromBgwMainLoop(Datum main_arg)
void *data;
ConditionVariable cv;
+ HeapTuple tuple;
+ Datum *values;
+ bool *nulls;
+ MemoryContext oldcontext = CurrentMemoryContext;
+
+ int prev_leaf_part_index = -1;
+ ResultRelInfo *saved_resultRelInfo = NULL;
+
+ BulkInsertState bistate;
+ uint64 processed = 0;
+ int nBufferedTuples = 0;
+
+#define MAX_BUFFERED_TUPLES 1000
+ HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
+ Size bufferedTuplesSize = 0;
+ int firstBufferedLineNo = 0;
+
/*
* Establish signal handlers.
*
@@ -5066,10 +5113,25 @@ CopyFromBgwMainLoop(Datum main_arg)
/*
* Attach to the appropriate message queues.
*/
- mq = shm_toc_lookup(toc, myworkernumber, false);
+ mq = shm_toc_lookup(toc, 3 + myworkernumber, false);
shm_mq_set_receiver(mq, MyProc);
mqh = shm_mq_attach(mq, seg, NULL);
+ // void *pt;
+ // pt = shm_toc_lookup(toc, 100500, false);
+ // int i;
+ // // iiiissssssss
+ // i = *(int *) pt;
+ // char *s;
+ // s = (char *) pt + sizeof(int);
+ //
+ //
+ // void *ptp;
+ //
+ // ptp = palloc(sizeof(int) + strlen(s));
+ // *ptp = i;
+
+
/* Restore database connection. */
BackgroundWorkerInitializeConnectionByOid(pst->database_id,
pst->authenticated_user_id);
@@ -5105,7 +5167,44 @@ CopyFromBgwMainLoop(Datum main_arg)
SetLatch(&registrant->procLatch);
/* Do the work. */
- // copy_messages(inqh, outqh);
+ values = (Datum *) palloc(pst->tupDesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(pst->tupDesc->natts * sizeof(bool));
+
+ volatile char *filename = shm_toc_lookup(toc, 2, false);
+ char *query_string;
+ query_string = filename;
+
+ elog(LOG, "BGWorker copying from query:\n %s", query_string);
+ // raw_parser_result = raw_parser(filename);
+ List *parsetree_list = pg_parse_query(query_string);
+ RawStmt *parsetree = lfirst_node(RawStmt, list_head(parsetree_list));
+ List *querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
+ NULL, 0, NULL);
+ List *plantree_list = pg_plan_queries(querytree_list,
+ CURSOR_OPT_PARALLEL_OK, NULL);
+ PlannedStmt *pstmt = lfirst_node(PlannedStmt, list_head(plantree_list));
+ // Node *parsetree = pstmt->utilityStmt;
+ // elog(LOG, "BGWorker parse tree length: %d", raw_parser_result->length);
+ // PlannedStmt *pstmt = linitial_node(PlannedStmt, raw_parser_result);
+ CopyStmt *cstmnt = (CopyStmt *) pstmt->utilityStmt;
+
+ elog(LOG, "BGWorker filename from CopyStmt: %s", cstmnt->filename);
+ // elog(LOG, "BGWorker copying: %d %d %d", cst->binary, cst->ignore_errors, cst->csv_mode);
+
+ ParseState *pstate;
+ pstate = make_parsestate(NULL);
+ pstate->p_sourcetext = query_string;
+
+ StartTransactionCommand();
+
+ Relation rel;
+ /* Open and lock the relation, using the appropriate lock type. */
+ rel = heap_openrv(cstmnt->relation, RowExclusiveLock);
+ // (is_from ? RowExclusiveLock : AccessShareLock));
+
+ CopyState cstate = BeginCopyFrom(pstate, rel, cstmnt->filename, cstmnt->is_program,
+ NULL, cstmnt->attlist, cstmnt->options);
+
for (;;)
{
char *msg;
@@ -5126,6 +5225,10 @@ CopyFromBgwMainLoop(Datum main_arg)
elog(LOG, "BGWorker #%d dummy processing line: %s", myworkernumber, msg);
}
+ heap_close(rel, NoLock);
+ // (is_from ? NoLock : AccessShareLock));
+ CommitTransactionCommand();
+
LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
++pst->workers_finished;
LWLockRelease(CopyFromBgwLock);
@@ -5133,6 +5236,9 @@ CopyFromBgwMainLoop(Datum main_arg)
/* Signal main process that we are done. */
ConditionVariableBroadcast(&cv);
+ pfree(values);
+ pfree(nulls);
+
/*
* We're done. Explicitly detach the shared memory segment so that we
* don't get a resource leak warning at commit time. This will fire any
@@ -5169,8 +5275,11 @@ handle_sigterm(SIGNAL_ARGS)
* for a test run.
*/
static ParallelState*
-shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
- shm_mq_handle ***mq_handles)
+setup_parallel_copy_from(int64 queue_size, int32 nworkers, dsm_segment **segp,
+ shm_mq_handle ***mq_handles, CopyState cstate,
+ ParseState *pstate,
+ List *attnamelist,
+ List *options)
{
int i;
dsm_segment *seg;
@@ -5181,7 +5290,10 @@ shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
mqs = palloc0(sizeof(shm_mq *) * nworkers);
/* Set up a dynamic shared memory segment. */
- setup_dynamic_shared_memory(queue_size, nworkers, &seg, &pst, &mqs);
+ setup_dsm(queue_size, nworkers, &seg, &pst, &mqs, cstate,
+ pstate,
+ attnamelist,
+ options);
*segp = seg;
/* Register background workers. */
@@ -5211,21 +5323,31 @@ shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
/*
* Set up a dynamic shared memory segment.
*
- * We set up a small control region that contains only a test_shm_mq_header,
- * plus one region per message queue. There are as many message queues as
- * the number of workers, plus one.
+ * We set up a control region that contains a ParallelState,
+ * plus one region per message queue. There are as many message queues as
+ * the number of workers.
*/
static void
-setup_dynamic_shared_memory(int64 queue_size, int nworkers,
+setup_dsm(int64 queue_size, int nworkers,
dsm_segment **segp, ParallelState **pstp,
- shm_mq ***mqs)
+ shm_mq ***mqs, CopyState cstate,
+ ParseState *pstate,
+ List *attnamelist,
+ List *options)
{
shm_toc_estimator e;
int i;
+ int toc_key = 0;
Size segsize;
dsm_segment *seg;
shm_toc *toc;
ParallelState *pst;
+ char *shm_filename;
+ List *shm_p_rtable;
+ List *shm_attnamelist;
+ List *shm_options;
+
+ Assert(cstate->rel);
/* Ensure a valid queue size. */
if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
@@ -5245,13 +5367,20 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
* requests, we must estimate each chunk separately.
*
* We need one key to register the location of the header, and we need
- * nworkers + 1 keys to track the locations of the message queues.
+ * nworkers keys to track the locations of the message queues.
*/
shm_toc_initialize_estimator(&e);
shm_toc_estimate_chunk(&e, sizeof(ParallelState));
- for (i = 0; i <= nworkers; ++i)
+
+ shm_toc_estimate_chunk(&e, sizeof(*pstate->p_rtable));
+ shm_toc_estimate_chunk(&e, strlen(ActivePortal->sourceText) * sizeof(char));
+ // shm_toc_estimate_chunk(&e, sizeof(*attnamelist));
+ shm_toc_estimate_chunk(&e, sizeof(*options));
+
+ for (i = 0; i < nworkers; ++i)
shm_toc_estimate_chunk(&e, (Size) queue_size);
- shm_toc_estimate_keys(&e, 1 + nworkers);
+
+ shm_toc_estimate_keys(&e, 1 + 4 + nworkers);
segsize = shm_toc_estimate(&e);
/* Create the shared memory segment and establish a table of contents. */
@@ -5269,7 +5398,208 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
pst->database_id = MyDatabaseId;
pst->authenticated_user_id = GetAuthenticatedUserId();
ConditionVariableInit(&pst->cv);
- shm_toc_insert(toc, 0, pst);
+
+ pst->is_program = cstate->is_program;
+ pst->rel = cstate->rel;
+
+ pst->estate = CreateExecutorState(); /* for ExecConstraints() */
+ pst->mycid = GetCurrentCommandId(true);
+ pst->hi_options = 0; /* start with default heap_insert options */
+
+ /*
+ * The target must be a plain relation or have an INSTEAD OF INSERT row
+ * trigger. (Currently, such triggers are only allowed on views, so we
+ * only hint about them in the view case.)
+ */
+ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+ cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+ !(cstate->rel->trigdesc &&
+ cstate->rel->trigdesc->trig_insert_instead_row))
+ {
+ if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to view \"%s\"",
+ RelationGetRelationName(cstate->rel)),
+ errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to materialized view \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to foreign table \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to sequence \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to non-table relation \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ }
+
+ pst->tupDesc = RelationGetDescr(cstate->rel);
+
+ /*----------
+ * Check to see if we can avoid writing WAL
+ *
+ * If archive logging/streaming is not enabled *and* either
+ * - table was created in same transaction as this COPY
+ * - data is being written to relfilenode created in this transaction
+ * then we can skip writing WAL. It's safe because if the transaction
+ * doesn't commit, we'll discard the table (or the new relfilenode file).
+ * If it does commit, we'll have done the heap_sync at the bottom of this
+ * routine first.
+ *
+ * As mentioned in comments in utils/rel.h, the in-same-transaction test
+ * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
+ * can be cleared before the end of the transaction. The exact case is
+ * when a relation sets a new relfilenode twice in same transaction, yet
+ * the second one fails in an aborted subtransaction, e.g.
+ *
+ * BEGIN;
+ * TRUNCATE t;
+ * SAVEPOINT save;
+ * TRUNCATE t;
+ * ROLLBACK TO save;
+ * COPY ...
+ *
+ * Also, if the target file is new-in-transaction, we assume that checking
+ * FSM for free space is a waste of time, even if we must use WAL because
+ * of archiving. This could possibly be wrong, but it's unlikely.
+ *
+ * The comments for heap_insert and RelationGetBufferForTuple specify that
+ * skipping WAL logging is only safe if we ensure that our tuples do not
+ * go into pages containing tuples from any other transactions --- but this
+ * must be the case if we have a new table or new relfilenode, so we need
+ * no additional work to enforce that.
+ *----------
+ */
+ /* createSubid is creation check, newRelfilenodeSubid is truncation check */
+ if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
+ cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
+ {
+ pst->hi_options |= HEAP_INSERT_SKIP_FSM;
+ if (!XLogIsNeeded())
+ pst->hi_options |= HEAP_INSERT_SKIP_WAL;
+ }
+
+ /*
+ * Optimize if new relfilenode was created in this subxact or one of its
+ * committed children and we won't see those rows later as part of an
+ * earlier scan or command. This ensures that if this subtransaction
+ * aborts then the frozen rows won't be visible after xact cleanup. Note
+ * that the stronger test of exactly which subtransaction created it is
+ * crucial for correctness of this optimization.
+ */
+ if (cstate->freeze)
+ {
+ if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot perform FREEZE because of prior transaction activity")));
+
+ if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
+ cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
+
+ pst->hi_options |= HEAP_INSERT_FROZEN;
+ }
+
+ /*
+ * We need a ResultRelInfo so we can use the regular executor's
+ * index-entry-making machinery. (There used to be a huge amount of code
+ * here that basically duplicated execUtils.c ...)
+ */
+ pst->resultRelInfo = makeNode(ResultRelInfo);
+ InitResultRelInfo(pst->resultRelInfo,
+ cstate->rel,
+ 1, /* dummy rangetable index */
+ NULL,
+ 0);
+
+ ExecOpenIndices(pst->resultRelInfo, false);
+
+ pst->estate->es_result_relations = pst->resultRelInfo;
+ pst->estate->es_num_result_relations = 1;
+ pst->estate->es_result_relation_info = pst->resultRelInfo;
+ pst->estate->es_range_table = cstate->range_table;
+
+ /* Set up a tuple slot too */
+ pst->myslot = ExecInitExtraTupleSlot(pst->estate);
+ ExecSetSlotDescriptor(pst->myslot, pst->tupDesc);
+ /* Triggers might need a slot as well */
+ pst->estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(pst->estate);
+
+ /*
+ * It's more efficient to prepare a bunch of tuples for insertion, and
+ * insert them in one heap_multi_insert() call, than call heap_insert()
+ * separately for every tuple. However, we can't do that if there are
+ * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+ * expressions. Such triggers or expressions might query the table we're
+ * inserting to, and act differently if the tuples that have already been
+ * processed and prepared for insertion are not there. We also can't do
+ * it if the table is partitioned.
+ */
+ pst->useHeapMultiInsert = !((pst->resultRelInfo->ri_TrigDesc != NULL &&
+ (pst->resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ pst->resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cstate->partition_dispatch_info != NULL ||
+ cstate->volatile_defexprs);
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
+ /*
+ * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
+ * should do this for COPY, since it's not really an "INSERT" statement as
+ * such. However, executing these triggers maintains consistency with the
+ * EACH ROW triggers that we already fire on COPY.
+ */
+ ExecBSInsertTriggers(pst->estate, pst->resultRelInfo);
+
+ pst->bistate = GetBulkInsertState();
+ pst->econtext = GetPerTupleExprContext(pst->estate);
+
+ /* Set up callback to identify error line number */
+ pst->errcallback.callback = CopyFromErrorCallback;
+ pst->errcallback.arg = (void *) cstate;
+ pst->errcallback.previous = error_context_stack;
+ error_context_stack = &pst->errcallback;
+
+ shm_toc_insert(toc, toc_key++, pst);
+
+ shm_p_rtable = shm_toc_allocate(toc, sizeof(*pstate->p_rtable));
+ shm_filename = shm_toc_allocate(toc, strlen(ActivePortal->sourceText) * sizeof(char));
+ // shm_attnamelist = shm_toc_allocate(toc, sizeof(*attnamelist));
+ shm_options = shm_toc_allocate(toc, sizeof(*options));
+
+ *shm_p_rtable = *pstate->p_rtable;
+ // *shm_attnamelist = *attnamelist;
+ *shm_options = *options;
+ strcpy(shm_filename, ActivePortal->sourceText);
+
+ shm_toc_insert(toc, toc_key++, shm_p_rtable);
+ shm_toc_insert(toc, toc_key++, shm_filename);
+ // shm_toc_insert(toc, toc_key++, shm_attnamelist);
+ shm_toc_insert(toc, toc_key++, shm_options);
+
+ // cst = shm_toc_allocate(toc, sizeof(CopyStateData) + strlen(cstate->filename) * sizeof(char));
+ // *cst = *cstate;
+ // cst->filename = shm_toc_allocate(toc, strlen(cstate->filename) * sizeof(char));
+ // // *cst->filename = *cstate->filename;
+ // strcpy(cst->filename, cstate->filename);
+ // // memcpy(cst->filename, cstate->filename, sizeof(*cstate->filename));
+ // // memcpy(cstate, cst, sizeof(CopyStateData));
+ // shm_toc_insert(toc, 1, cst);
/* Set up one message queue per worker, plus one. */
for (i = 0; i < nworkers; ++i)
@@ -5278,7 +5608,7 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
(Size) queue_size);
- shm_toc_insert(toc, i + 1, mq);
+ shm_toc_insert(toc, toc_key++, mq);
shm_mq_set_sender(mq, MyProc);
(*mqs)[i] = mq;
}
@@ -5477,11 +5807,447 @@ wait_for_workers_to_finish(volatile ParallelState *pst)
ConditionVariableCancelSleep();
}
+// /*
+// * Parse the current line into separate attributes (fields),
+// * performing de-escaping as needed.
+// *
+// * The input is in line_buf. We use attribute_buf to hold the result
+// * strings. cstate->raw_fields[k] is set to point to the k'th attribute
+// * string, or NULL when the input matches the null marker string.
+// * This array is expanded as necessary.
+// *
+// * (Note that the caller cannot check for nulls since the returned
+// * string would be the post-de-escaping equivalent, which may look
+// * the same as some valid data string.)
+// *
+// * delim is the column delimiter string (must be just one byte for now).
+// * null_print is the null marker string. Note that this is compared to
+// * the pre-de-escaped input string.
+// *
+// * The return value is the number of fields actually read.
+// */
+// static int
+// CopyReadTextAttrs(CopyState cstate)
+// {
+// char delimc = cstate->delim[0];
+// int fieldno;
+// char *output_ptr;
+// char *cur_ptr;
+// char *line_end_ptr;
+//
+// /*
+// * We need a special case for zero-column tables: check that the input
+// * line is empty, and return.
+// */
+// if (cstate->max_fields <= 0)
+// {
+// if (cstate->line_buf.len != 0)
+// ereport(ERROR,
+// (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+// errmsg("extra data after last expected column")));
+// return 0;
+// }
+//
+// resetStringInfo(&cstate->attribute_buf);
+//
+// /*
+// * The de-escaped attributes will certainly not be longer than the input
+// * data line, so we can just force attribute_buf to be large enough and
+// * then transfer data without any checks for enough space. We need to do
+// * it this way because enlarging attribute_buf mid-stream would invalidate
+// * pointers already stored into cstate->raw_fields[].
+// */
+// if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
+// enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
+// output_ptr = cstate->attribute_buf.data;
+//
+// /* set pointer variables for loop */
+// cur_ptr = cstate->line_buf.data;
+// line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
+//
+// /* Outer loop iterates over fields */
+// fieldno = 0;
+// for (;;)
+// {
+// bool found_delim = false;
+// char *start_ptr;
+// char *end_ptr;
+// int input_len;
+// bool saw_non_ascii = false;
+//
+// /* Make sure there is enough space for the next value */
+// if (fieldno >= cstate->max_fields)
+// {
+// cstate->max_fields *= 2;
+// cstate->raw_fields =
+// repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
+// }
+//
+// /* Remember start of field on both input and output sides */
+// start_ptr = cur_ptr;
+// cstate->raw_fields[fieldno] = output_ptr;
+//
+// /*
+// * Scan data for field.
+// *
+// * Note that in this loop, we are scanning to locate the end of field
+// * and also speculatively performing de-escaping. Once we find the
+// * end-of-field, we can match the raw field contents against the null
+// * marker string. Only after that comparison fails do we know that
+// * de-escaping is actually the right thing to do; therefore we *must
+// * not* throw any syntax errors before we've done the null-marker
+// * check.
+// */
+// for (;;)
+// {
+// char c;
+//
+// end_ptr = cur_ptr;
+// if (cur_ptr >= line_end_ptr)
+// break;
+// c = *cur_ptr++;
+// if (c == delimc)
+// {
+// found_delim = true;
+// break;
+// }
+// if (c == '\\')
+// {
+// if (cur_ptr >= line_end_ptr)
+// break;
+// c = *cur_ptr++;
+// switch (c)
+// {
+// case '0':
+// case '1':
+// case '2':
+// case '3':
+// case '4':
+// case '5':
+// case '6':
+// case '7':
+// {
+// /* handle \013 */
+// int val;
+//
+// val = OCTVALUE(c);
+// if (cur_ptr < line_end_ptr)
+// {
+// c = *cur_ptr;
+// if (ISOCTAL(c))
+// {
+// cur_ptr++;
+// val = (val << 3) + OCTVALUE(c);
+// if (cur_ptr < line_end_ptr)
+// {
+// c = *cur_ptr;
+// if (ISOCTAL(c))
+// {
+// cur_ptr++;
+// val = (val << 3) + OCTVALUE(c);
+// }
+// }
+// }
+// }
+// c = val & 0377;
+// if (c == '\0' || IS_HIGHBIT_SET(c))
+// saw_non_ascii = true;
+// }
+// break;
+// case 'x':
+// /* Handle \x3F */
+// if (cur_ptr < line_end_ptr)
+// {
+// char hexchar = *cur_ptr;
+//
+// if (isxdigit((unsigned char) hexchar))
+// {
+// int val = GetDecimalFromHex(hexchar);
+//
+// cur_ptr++;
+// if (cur_ptr < line_end_ptr)
+// {
+// hexchar = *cur_ptr;
+// if (isxdigit((unsigned char) hexchar))
+// {
+// cur_ptr++;
+// val = (val << 4) + GetDecimalFromHex(hexchar);
+// }
+// }
+// c = val & 0xff;
+// if (c == '\0' || IS_HIGHBIT_SET(c))
+// saw_non_ascii = true;
+// }
+// }
+// break;
+// case 'b':
+// c = '\b';
+// break;
+// case 'f':
+// c = '\f';
+// break;
+// case 'n':
+// c = '\n';
+// break;
+// case 'r':
+// c = '\r';
+// break;
+// case 't':
+// c = '\t';
+// break;
+// case 'v':
+// c = '\v';
+// break;
+//
+// /*
+// * in all other cases, take the char after '\'
+// * literally
+// */
+// }
+// }
+//
+// /* Add c to output string */
+// *output_ptr++ = c;
+// }
+//
+// /* Check whether raw input matched null marker */
+// input_len = end_ptr - start_ptr;
+// if (input_len == cstate->null_print_len &&
+// strncmp(start_ptr, cstate->null_print, input_len) == 0)
+// cstate->raw_fields[fieldno] = NULL;
+// else
+// {
+// /*
+// * At this point we know the field is supposed to contain data.
+// *
+// * If we de-escaped any non-7-bit-ASCII chars, make sure the
+// * resulting string is valid data for the db encoding.
+// */
+// if (saw_non_ascii)
+// {
+// char *fld = cstate->raw_fields[fieldno];
+//
+// pg_verifymbstr(fld, output_ptr - fld, false);
+// }
+// }
+//
+// /* Terminate attribute value in output area */
+// *output_ptr++ = '\0';
+//
+// fieldno++;
+// /* Done if we hit EOL instead of a delim */
+// if (!found_delim)
+// break;
+// }
+//
+// /* Clean up state of attribute_buf */
+// output_ptr--;
+// Assert(*output_ptr == '\0');
+// cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
+//
+// return fieldno;
+// }
+//
+// /*
+// * Parse the current line into separate attributes (fields),
+// * performing de-escaping as needed. This has exactly the same API as
+// * CopyReadAttributesText, except we parse the fields according to
+// * "standard" (i.e. common) CSV usage.
+// */
+// static int
+// CopyReadCSVAttrs(CopyState cstate)
+// {
+// char delimc = cstate->delim[0];
+// char quotec = cstate->quote[0];
+// char escapec = cstate->escape[0];
+// int fieldno;
+// char *output_ptr;
+// char *cur_ptr;
+// char *line_end_ptr;
+//
+// /*
+// * We need a special case for zero-column tables: check that the input
+// * line is empty, and return.
+// */
+// if (cstate->max_fields <= 0)
+// {
+// if (cstate->line_buf.len != 0)
+// ereport(ERROR,
+// (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+// errmsg("extra data after last expected column")));
+// return 0;
+// }
+//
+// resetStringInfo(&cstate->attribute_buf);
+//
+// /*
+// * The de-escaped attributes will certainly not be longer than the input
+// * data line, so we can just force attribute_buf to be large enough and
+// * then transfer data without any checks for enough space. We need to do
+// * it this way because enlarging attribute_buf mid-stream would invalidate
+// * pointers already stored into cstate->raw_fields[].
+// */
+// if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
+// enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
+// output_ptr = cstate->attribute_buf.data;
+//
+// /* set pointer variables for loop */
+// cur_ptr = cstate->line_buf.data;
+// line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
+//
+// /* Outer loop iterates over fields */
+// fieldno = 0;
+// for (;;)
+// {
+// bool found_delim = false;
+// bool saw_quote = false;
+// char *start_ptr;
+// char *end_ptr;
+// int input_len;
+//
+// /* Make sure there is enough space for the next value */
+// if (fieldno >= cstate->max_fields)
+// {
+// cstate->max_fields *= 2;
+// cstate->raw_fields =
+// repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
+// }
+//
+// /* Remember start of field on both input and output sides */
+// start_ptr = cur_ptr;
+// cstate->raw_fields[fieldno] = output_ptr;
+//
+// /*
+// * Scan data for field,
+// *
+// * The loop starts in "not quote" mode and then toggles between that
+// * and "in quote" mode. The loop exits normally if it is in "not
+// * quote" mode and a delimiter or line end is seen.
+// */
+// for (;;)
+// {
+// char c;
+//
+// /* Not in quote */
+// for (;;)
+// {
+// end_ptr = cur_ptr;
+// if (cur_ptr >= line_end_ptr)
+// goto endfield;
+// c = *cur_ptr++;
+// /* unquoted field delimiter */
+// if (c == delimc)
+// {
+// found_delim = true;
+// goto endfield;
+// }
+// /* start of quoted field (or part of field) */
+// if (c == quotec)
+// {
+// saw_quote = true;
+// break;
+// }
+// /* Add c to output string */
+// *output_ptr++ = c;
+// }
+//
+// /* In quote */
+// for (;;)
+// {
+// end_ptr = cur_ptr;
+// if (cur_ptr >= line_end_ptr)
+// ereport(ERROR,
+// (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+// errmsg("unterminated CSV quoted field")));
+//
+// c = *cur_ptr++;
+//
+// /* escape within a quoted field */
+// if (c == escapec)
+// {
+// /*
+// * peek at the next char if available, and escape it if it
+// * is an escape char or a quote char
+// */
+// if (cur_ptr < line_end_ptr)
+// {
+// char nextc = *cur_ptr;
+//
+// if (nextc == escapec || nextc == quotec)
+// {
+// *output_ptr++ = nextc;
+// cur_ptr++;
+// continue;
+// }
+// }
+// }
+//
+// /*
+// * end of quoted field. Must do this test after testing for
+// * escape in case quote char and escape char are the same
+// * (which is the common case).
+// */
+// if (c == quotec)
+// break;
+//
+// /* Add c to output string */
+// *output_ptr++ = c;
+// }
+// }
+// endfield:
+//
+// /* Terminate attribute value in output area */
+// *output_ptr++ = '\0';
+//
+// /* Check whether raw input matched null marker */
+// input_len = end_ptr - start_ptr;
+// if (!saw_quote && input_len == cstate->null_print_len &&
+// strncmp(start_ptr, cstate->null_print, input_len) == 0)
+// cstate->raw_fields[fieldno] = NULL;
+//
+// fieldno++;
+// /* Done if we hit EOL instead of a delim */
+// if (!found_delim)
+// break;
+// }
+//
+// /* Clean up state of attribute_buf */
+// output_ptr--;
+// Assert(*output_ptr == '\0');
+// cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
+//
+// return fieldno;
+// }
+//
+// /*
+// *
+// */
+// bool
+// CopyFromReadAttributes(char *line, bool csv_mode, char ***fields, int *nfields)
+// {
+// int fldct;
+//
+// /* Parse the line into de-escaped field values */
+// if (csv_mode)
+// fldct = CopyReadCSVAttrs(cstate);
+// else
+// fldct = CopyReadTextAttrs(cstate);
+//
+// *fields = cstate->raw_fields;
+// *nfields = fldct;
+// return true;
+// }
+
/*
* Parallel Copy FROM file to relation.
*/
extern uint64
-ParallelCopyFrom(CopyState cstate)
+ParallelCopyFrom(CopyState cstate, ParseState *pstate,
+ Relation rel,
+ const char *filename,
+ bool is_program,
+ List *attnamelist,
+ List *options)
{
ParallelState *pst;
dsm_segment *seg;
@@ -5491,222 +6257,45 @@ ParallelCopyFrom(CopyState cstate)
shm_mq_result shmq_res;
int last_worker_used = 0;
// int64 message = 0;
- char *message;
+ // char *message;
// char *message_contents = VARDATA_ANY(message);
// int message_size = VARSIZE_ANY_EXHDR(message);
int message_size = sizeof(char);
mq_handles = palloc0(sizeof(shm_mq_handle *) * nworkers);
- HeapTuple tuple;
- TupleDesc tupDesc;
- Datum *values;
- bool *nulls;
- ResultRelInfo *resultRelInfo;
- ResultRelInfo *saved_resultRelInfo = NULL;
- EState *estate = CreateExecutorState(); /* for ExecConstraints() */
- ExprContext *econtext;
- TupleTableSlot *myslot;
- MemoryContext oldcontext = CurrentMemoryContext;
-
- ErrorContextCallback errcallback;
- CommandId mycid = GetCurrentCommandId(true);
- int hi_options = 0; /* start with default heap_insert options */
- BulkInsertState bistate;
- uint64 processed = 0;
- bool useHeapMultiInsert;
- int nBufferedTuples = 0;
- int prev_leaf_part_index = -1;
-
-#define MAX_BUFFERED_TUPLES 1000
- HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
- Size bufferedTuplesSize = 0;
- int firstBufferedLineNo = 0;
+ MemoryContext oldcontext = CurrentMemoryContext;
- Assert(cstate->rel);
+ // TupleDesc tupDesc;
+ // ResultRelInfo *resultRelInfo;
+ // EState *estate = CreateExecutorState(); /* for ExecConstraints() */
+ // ExprContext *econtext;
+ // TupleTableSlot *myslot;
+ //
+ // ErrorContextCallback errcallback;
+ // CommandId mycid = GetCurrentCommandId(true);
+ // int hi_options = 0; /* start with default heap_insert options */
+ // BulkInsertState bistate;
+ // bool useHeapMultiInsert;
+
+ // ResultRelInfo *saved_resultRelInfo = NULL;
+ // int prev_leaf_part_index = -1;
+ uint64 processed = 0;
+ // int nBufferedTuples = 0;
+// #define MAX_BUFFERED_TUPLES 1000
+// HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
+// Size bufferedTuplesSize = 0;
+// int firstBufferedLineNo = 0;
// MQ size = 100 messages x 80 chars each
- pst = shm_mq_setup(message_size * 80 * queue_size, nworkers, &seg, &mq_handles);
-
- /*
- * The target must be a plain relation or have an INSTEAD OF INSERT row
- * trigger. (Currently, such triggers are only allowed on views, so we
- * only hint about them in the view case.)
- */
- if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
- cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
- !(cstate->rel->trigdesc &&
- cstate->rel->trigdesc->trig_insert_instead_row))
- {
- if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to view \"%s\"",
- RelationGetRelationName(cstate->rel)),
- errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
- else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to materialized view \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to foreign table \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to sequence \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to non-table relation \"%s\"",
- RelationGetRelationName(cstate->rel))));
- }
-
- tupDesc = RelationGetDescr(cstate->rel);
-
- /*----------
- * Check to see if we can avoid writing WAL
- *
- * If archive logging/streaming is not enabled *and* either
- * - table was created in same transaction as this COPY
- * - data is being written to relfilenode created in this transaction
- * then we can skip writing WAL. It's safe because if the transaction
- * doesn't commit, we'll discard the table (or the new relfilenode file).
- * If it does commit, we'll have done the heap_sync at the bottom of this
- * routine first.
- *
- * As mentioned in comments in utils/rel.h, the in-same-transaction test
- * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
- * can be cleared before the end of the transaction. The exact case is
- * when a relation sets a new relfilenode twice in same transaction, yet
- * the second one fails in an aborted subtransaction, e.g.
- *
- * BEGIN;
- * TRUNCATE t;
- * SAVEPOINT save;
- * TRUNCATE t;
- * ROLLBACK TO save;
- * COPY ...
- *
- * Also, if the target file is new-in-transaction, we assume that checking
- * FSM for free space is a waste of time, even if we must use WAL because
- * of archiving. This could possibly be wrong, but it's unlikely.
- *
- * The comments for heap_insert and RelationGetBufferForTuple specify that
- * skipping WAL logging is only safe if we ensure that our tuples do not
- * go into pages containing tuples from any other transactions --- but this
- * must be the case if we have a new table or new relfilenode, so we need
- * no additional work to enforce that.
- *----------
- */
- /* createSubid is creation check, newRelfilenodeSubid is truncation check */
- if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
- cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
- {
- hi_options |= HEAP_INSERT_SKIP_FSM;
- if (!XLogIsNeeded())
- hi_options |= HEAP_INSERT_SKIP_WAL;
- }
-
- /*
- * Optimize if new relfilenode was created in this subxact or one of its
- * committed children and we won't see those rows later as part of an
- * earlier scan or command. This ensures that if this subtransaction
- * aborts then the frozen rows won't be visible after xact cleanup. Note
- * that the stronger test of exactly which subtransaction created it is
- * crucial for correctness of this optimization.
- */
- if (cstate->freeze)
- {
- if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
- errmsg("cannot perform FREEZE because of prior transaction activity")));
-
- if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
- cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
-
- hi_options |= HEAP_INSERT_FROZEN;
- }
-
- /*
- * We need a ResultRelInfo so we can use the regular executor's
- * index-entry-making machinery. (There used to be a huge amount of code
- * here that basically duplicated execUtils.c ...)
- */
- resultRelInfo = makeNode(ResultRelInfo);
- InitResultRelInfo(resultRelInfo,
- cstate->rel,
- 1, /* dummy rangetable index */
- NULL,
- 0);
-
- ExecOpenIndices(resultRelInfo, false);
-
- estate->es_result_relations = resultRelInfo;
- estate->es_num_result_relations = 1;
- estate->es_result_relation_info = resultRelInfo;
- estate->es_range_table = cstate->range_table;
-
- /* Set up a tuple slot too */
- myslot = ExecInitExtraTupleSlot(estate);
- ExecSetSlotDescriptor(myslot, tupDesc);
- /* Triggers might need a slot as well */
- estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
-
- /*
- * It's more efficient to prepare a bunch of tuples for insertion, and
- * insert them in one heap_multi_insert() call, than call heap_insert()
- * separately for every tuple. However, we can't do that if there are
- * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
- * expressions. Such triggers or expressions might query the table we're
- * inserting to, and act differently if the tuples that have already been
- * processed and prepared for insertion are not there. We also can't do
- * it if the table is partitioned.
- */
- if ((resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
- cstate->partition_dispatch_info != NULL ||
- cstate->volatile_defexprs)
- {
- useHeapMultiInsert = false;
- }
- else
- {
- useHeapMultiInsert = true;
- bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
- }
-
- /* Prepare to catch AFTER triggers. */
- AfterTriggerBeginQuery();
-
- /*
- * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
- * should do this for COPY, since it's not really an "INSERT" statement as
- * such. However, executing these triggers maintains consistency with the
- * EACH ROW triggers that we already fire on COPY.
- */
- ExecBSInsertTriggers(estate, resultRelInfo);
-
- values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
- nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
- bistate = GetBulkInsertState();
- econtext = GetPerTupleExprContext(estate);
-
- /* Set up callback to identify error line number */
- errcallback.callback = CopyFromErrorCallback;
- errcallback.arg = (void *) cstate;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
+ pst = setup_parallel_copy_from(message_size * 80 * queue_size,
+ nworkers,
+ &seg,
+ &mq_handles,
+ cstate,
+ pstate,
+ attnamelist,
+ options);
// // BG Worker startup
// RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
@@ -5714,6 +6303,8 @@ ParallelCopyFrom(CopyState cstate)
// elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d)", MyProcPid, bgwpid);
// // BG Worker startup
+ elog(LOG, "Copying from file %s", cstate->filename);
+
for (;;)
{
// TupleTableSlot *slot;
@@ -6066,17 +6657,14 @@ ParallelCopyFrom(CopyState cstate)
pq_endmsgread();
/* Execute AFTER STATEMENT insertion triggers */
- ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
+ ExecASInsertTriggers(pst->estate, pst->resultRelInfo, cstate->transition_capture);
/* Handle queued AFTER triggers */
- AfterTriggerEndQuery(estate);
-
- pfree(values);
- pfree(nulls);
+ AfterTriggerEndQuery(pst->estate);
- ExecResetTupleTable(estate->es_tupleTable, false);
+ ExecResetTupleTable(pst->estate->es_tupleTable, false);
- ExecCloseIndices(resultRelInfo);
+ ExecCloseIndices(pst->resultRelInfo);
/* Close all the partitioned tables, leaf partitions, and their indices */
if (cstate->partition_dispatch_info)
@@ -6109,15 +6697,15 @@ ParallelCopyFrom(CopyState cstate)
}
/* Close any trigger target relations */
- ExecCleanUpTriggerState(estate);
+ ExecCleanUpTriggerState(pst->estate);
- FreeExecutorState(estate);
+ FreeExecutorState(pst->estate);
/*
* If we skipped writing WAL, then we need to sync the heap (but not
* indexes since those use WAL anyway)
*/
- if (hi_options & HEAP_INSERT_SKIP_WAL)
+ if (pst->hi_options & HEAP_INSERT_SKIP_WAL)
heap_sync(cstate->rel);
/* Clean up. */
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 2935e567ae..dbc769828f 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -40,7 +40,12 @@ extern void CopyFromErrorCallback(void *arg);
extern void CopyFromBgwMainLoop(Datum main_arg);
extern uint64 CopyFrom(CopyState cstate);
-extern uint64 ParallelCopyFrom(CopyState cstate);
+extern uint64 ParallelCopyFrom(CopyState cstate, ParseState *pstate,
+ Relation rel,
+ const char *filename,
+ bool is_program,
+ List *attnamelist,
+ List *options);
extern DestReceiver *CreateCopyDestReceiver(void);
--
2.11.0
From 749717cbf9049ed419f99177bb5bcd1aad68b3eb Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Fri, 18 Aug 2017 18:34:39 +0300
Subject: [PATCH 10/13] Working parallel COPY FROM
---
src/backend/commands/copy.c | 1747 ++++++++++++++++---------------------------
1 file changed, 638 insertions(+), 1109 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 7f38e70bf3..083291d1c7 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -257,8 +257,8 @@ typedef struct
typedef struct
{
- slock_t mutex;
- int curr_line;
+ // slock_t mutex;
+ uint64 processed;
int workers_total;
int workers_attached;
int workers_ready;
@@ -266,21 +266,6 @@ typedef struct
Oid database_id;
Oid authenticated_user_id;
ConditionVariable cv;
-
- bool is_program;
- Relation rel;
-
- TupleDesc tupDesc;
- ResultRelInfo *resultRelInfo;
- EState *estate;
- ExprContext *econtext;
- TupleTableSlot *myslot;
-
- ErrorContextCallback errcallback;
- CommandId mycid;
- int hi_options;
- BulkInsertState bistate;
- bool useHeapMultiInsert;
} ParallelState;
// TODO Consider change
@@ -3341,26 +3326,29 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
/* only available for text or csv input */
Assert(!cstate->binary);
- /* on input just throw the header line away */
- if (cstate->cur_lineno == 0 && cstate->header_line)
- {
- cstate->cur_lineno++;
- if (CopyReadLine(cstate))
- return false; /* done */
- }
+ if (!IsBackgroundWorker)
+ {
+ /* on input just throw the header line away */
+ if (cstate->cur_lineno == 0 && cstate->header_line)
+ {
+ cstate->cur_lineno++;
+ if (CopyReadLine(cstate))
+ return false; /* done */
+ }
- cstate->cur_lineno++;
+ cstate->cur_lineno++;
- /* Actually read the line into memory here */
- done = CopyReadLine(cstate);
+ /* Actually read the line into memory here */
+ done = CopyReadLine(cstate);
- /*
- * EOF at start of line means we're done. If we see EOF after some
- * characters, we act as though it was newline followed by EOF, ie,
- * process the line and then exit loop on next iteration.
- */
- if (done && cstate->line_buf.len == 0)
- return false;
+ /*
+ * EOF at start of line means we're done. If we see EOF after some
+ * characters, we act as though it was newline followed by EOF, ie,
+ * process the line and then exit loop on next iteration.
+ */
+ if (done && cstate->line_buf.len == 0)
+ return false;
+ }
/* Parse the line into de-escaped field values */
if (cstate->csv_mode)
@@ -5029,7 +5017,6 @@ void
CopyFromBgwMainLoop(Datum main_arg)
{
volatile ParallelState *pst;
- // volatile CopyState cst;
dsm_segment *seg;
shm_toc *toc;
@@ -5044,21 +5031,43 @@ CopyFromBgwMainLoop(Datum main_arg)
HeapTuple tuple;
Datum *values;
- bool *nulls;
+ bool *nulls;
MemoryContext oldcontext = CurrentMemoryContext;
- int prev_leaf_part_index = -1;
- ResultRelInfo *saved_resultRelInfo = NULL;
+ int prev_leaf_part_index = -1;
+ ResultRelInfo *saved_resultRelInfo = NULL;
- BulkInsertState bistate;
- uint64 processed = 0;
- int nBufferedTuples = 0;
+ uint64 processed = 0;
+ int nBufferedTuples = 0;
#define MAX_BUFFERED_TUPLES 1000
HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
Size bufferedTuplesSize = 0;
int firstBufferedLineNo = 0;
+ TupleDesc tupDesc;
+ ResultRelInfo *resultRelInfo;
+ EState *estate;
+ ExprContext *econtext;
+ TupleTableSlot *myslot;
+
+ ErrorContextCallback errcallback;
+ CommandId mycid;
+ int hi_options;
+ BulkInsertState bistate;
+ bool useHeapMultiInsert;
+
+ char *query_string;
+ List *parsetree_list;
+ RawStmt *parsetree;
+ List *querytree_list;
+ List *plantree_list;
+ PlannedStmt *pstmt;
+ CopyStmt *cstmnt;
+ ParseState *pstate;
+ CopyState cstate;
+ Relation rel;
+
/*
* Establish signal handlers.
*
@@ -5079,7 +5088,7 @@ CopyFromBgwMainLoop(Datum main_arg)
* of contents so we can locate the various data structures we'll need to
* find within the segment.
*/
- CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker");
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "COPY FROM worker");
seg = dsm_attach(DatumGetInt32(main_arg));
if (seg == NULL)
ereport(ERROR,
@@ -5113,25 +5122,10 @@ CopyFromBgwMainLoop(Datum main_arg)
/*
* Attach to the appropriate message queues.
*/
- mq = shm_toc_lookup(toc, 3 + myworkernumber, false);
+ mq = shm_toc_lookup(toc, 1 + myworkernumber, false);
shm_mq_set_receiver(mq, MyProc);
mqh = shm_mq_attach(mq, seg, NULL);
- // void *pt;
- // pt = shm_toc_lookup(toc, 100500, false);
- // int i;
- // // iiiissssssss
- // i = *(int *) pt;
- // char *s;
- // s = (char *) pt + sizeof(int);
- //
- //
- // void *ptp;
- //
- // ptp = palloc(sizeof(int) + strlen(s));
- // *ptp = i;
-
-
/* Restore database connection. */
BackgroundWorkerInitializeConnectionByOid(pst->database_id,
pst->authenticated_user_id);
@@ -5142,6 +5136,23 @@ CopyFromBgwMainLoop(Datum main_arg)
*/
SetClientEncoding(GetDatabaseEncoding());
+ query_string = shm_toc_lookup(toc, 1, false);
+
+ elog(LOG, "BGWorker copying from query:\n %s", query_string);
+ parsetree_list = pg_parse_query(query_string);
+ parsetree = lfirst_node(RawStmt, list_head(parsetree_list));
+ querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
+ NULL, 0, NULL);
+ plantree_list = pg_plan_queries(querytree_list,
+ CURSOR_OPT_PARALLEL_OK, NULL);
+ pstmt = lfirst_node(PlannedStmt, list_head(plantree_list));
+ cstmnt = (CopyStmt *) pstmt->utilityStmt;
+
+ elog(LOG, "BGWorker #%d filename from CopyStmt: %s", myworkernumber, cstmnt->filename);
+
+ pstate = make_parsestate(NULL);
+ pstate->p_sourcetext = query_string;
+
/*
* Indicate that we're fully initialized and ready to begin the main part
* of the parallel operation.
@@ -5155,59 +5166,233 @@ CopyFromBgwMainLoop(Datum main_arg)
// SpinLockAcquire(&pst->mutex);
++pst->workers_ready;
cv = pst->cv;
- elog(LOG, "BGWorker #%d started", myworkernumber);
- // SpinLockRelease(&pst->mutex);
+ // if (pst->workers_ready == pst->workers_total)
+ // {
+ // registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+ // if (registrant == NULL)
+ // {
+ // elog(DEBUG1, "registrant backend has exited prematurely");
+ // proc_exit(1);
+ // }
+ // SetLatch(&registrant->procLatch);
+ // }
+ // elog(LOG, "BGWorker #%d started", myworkernumber);
+ // // SpinLockRelease(&pst->mutex);
LWLockRelease(CopyFromBgwLock);
+
registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
- if (registrant == NULL)
+ SetLatch(&registrant->procLatch);
+
+ StartTransactionCommand();
+
+ /* Open and lock the relation, using the appropriate lock type. */
+ rel = heap_openrv(cstmnt->relation, RowExclusiveLock);
+ // (is_from ? RowExclusiveLock : AccessShareLock));
+
+ cstate = BeginCopyFrom(pstate, rel, cstmnt->filename, cstmnt->is_program,
+ NULL, cstmnt->attlist, cstmnt->options);
+
+ estate = CreateExecutorState(); /* for ExecConstraints() */
+ mycid = GetCurrentCommandId(true);
+ hi_options = 0; /* start with default heap_insert options */
+
+ /*
+ * The target must be a plain relation or have an INSTEAD OF INSERT row
+ * trigger. (Currently, such triggers are only allowed on views, so we
+ * only hint about them in the view case.)
+ */
+ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+ cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+ !(cstate->rel->trigdesc &&
+ cstate->rel->trigdesc->trig_insert_instead_row))
{
- elog(DEBUG1, "registrant backend has exited prematurely");
- proc_exit(1);
+ if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to view \"%s\"",
+ RelationGetRelationName(cstate->rel)),
+ errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to materialized view \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to foreign table \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to sequence \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to non-table relation \"%s\"",
+ RelationGetRelationName(cstate->rel))));
}
- SetLatch(&registrant->procLatch);
- /* Do the work. */
- values = (Datum *) palloc(pst->tupDesc->natts * sizeof(Datum));
- nulls = (bool *) palloc(pst->tupDesc->natts * sizeof(bool));
+ tupDesc = RelationGetDescr(cstate->rel);
- volatile char *filename = shm_toc_lookup(toc, 2, false);
- char *query_string;
- query_string = filename;
+ /*----------
+ * Check to see if we can avoid writing WAL
+ *
+ * If archive logging/streaming is not enabled *and* either
+ * - table was created in same transaction as this COPY
+ * - data is being written to relfilenode created in this transaction
+ * then we can skip writing WAL. It's safe because if the transaction
+ * doesn't commit, we'll discard the table (or the new relfilenode file).
+ * If it does commit, we'll have done the heap_sync at the bottom of this
+ * routine first.
+ *
+ * As mentioned in comments in utils/rel.h, the in-same-transaction test
+ * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
+ * can be cleared before the end of the transaction. The exact case is
+ * when a relation sets a new relfilenode twice in same transaction, yet
+ * the second one fails in an aborted subtransaction, e.g.
+ *
+ * BEGIN;
+ * TRUNCATE t;
+ * SAVEPOINT save;
+ * TRUNCATE t;
+ * ROLLBACK TO save;
+ * COPY ...
+ *
+ * Also, if the target file is new-in-transaction, we assume that checking
+ * FSM for free space is a waste of time, even if we must use WAL because
+ * of archiving. This could possibly be wrong, but it's unlikely.
+ *
+ * The comments for heap_insert and RelationGetBufferForTuple specify that
+ * skipping WAL logging is only safe if we ensure that our tuples do not
+ * go into pages containing tuples from any other transactions --- but this
+ * must be the case if we have a new table or new relfilenode, so we need
+ * no additional work to enforce that.
+ *----------
+ */
+ /* createSubid is creation check, newRelfilenodeSubid is truncation check */
+ if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
+ cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
+ {
+ hi_options |= HEAP_INSERT_SKIP_FSM;
+ if (!XLogIsNeeded())
+ hi_options |= HEAP_INSERT_SKIP_WAL;
+ }
- elog(LOG, "BGWorker copying from query:\n %s", query_string);
- // raw_parser_result = raw_parser(filename);
- List *parsetree_list = pg_parse_query(query_string);
- RawStmt *parsetree = lfirst_node(RawStmt, list_head(parsetree_list));
- List *querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
- NULL, 0, NULL);
- List *plantree_list = pg_plan_queries(querytree_list,
- CURSOR_OPT_PARALLEL_OK, NULL);
- PlannedStmt *pstmt = lfirst_node(PlannedStmt, list_head(plantree_list));
- // Node *parsetree = pstmt->utilityStmt;
- // elog(LOG, "BGWorker parse tree length: %d", raw_parser_result->length);
- // PlannedStmt *pstmt = linitial_node(PlannedStmt, raw_parser_result);
- CopyStmt *cstmnt = (CopyStmt *) pstmt->utilityStmt;
+ /*
+ * Optimize if new relfilenode was created in this subxact or one of its
+ * committed children and we won't see those rows later as part of an
+ * earlier scan or command. This ensures that if this subtransaction
+ * aborts then the frozen rows won't be visible after xact cleanup. Note
+ * that the stronger test of exactly which subtransaction created it is
+ * crucial for correctness of this optimization.
+ */
+ if (cstate->freeze)
+ {
+ if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot perform FREEZE because of prior transaction activity")));
- elog(LOG, "BGWorker filename from CopyStmt: %s", cstmnt->filename);
- // elog(LOG, "BGWorker copying: %d %d %d", cst->binary, cst->ignore_errors, cst->csv_mode);
+ if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
+ cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
- ParseState *pstate;
- pstate = make_parsestate(NULL);
- pstate->p_sourcetext = query_string;
+ hi_options |= HEAP_INSERT_FROZEN;
+ }
- StartTransactionCommand();
+ /*
+ * We need a ResultRelInfo so we can use the regular executor's
+ * index-entry-making machinery. (There used to be a huge amount of code
+ * here that basically duplicated execUtils.c ...)
+ */
+ resultRelInfo = makeNode(ResultRelInfo);
+ InitResultRelInfo(resultRelInfo,
+ cstate->rel,
+ 1, /* dummy rangetable index */
+ NULL,
+ 0);
- Relation rel;
- /* Open and lock the relation, using the appropriate lock type. */
- rel = heap_openrv(cstmnt->relation, RowExclusiveLock);
- // (is_from ? RowExclusiveLock : AccessShareLock));
+ ExecOpenIndices(resultRelInfo, false);
- CopyState cstate = BeginCopyFrom(pstate, rel, cstmnt->filename, cstmnt->is_program,
- NULL, cstmnt->attlist, cstmnt->options);
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+ estate->es_range_table = cstate->range_table;
+
+ /* Set up a tuple slot too */
+ myslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(myslot, tupDesc);
+ /* Triggers might need a slot as well */
+ estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
+
+ /*
+ * It's more efficient to prepare a bunch of tuples for insertion, and
+ * insert them in one heap_multi_insert() call, than call heap_insert()
+ * separately for every tuple. However, we can't do that if there are
+ * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+ * expressions. Such triggers or expressions might query the table we're
+ * inserting to, and act differently if the tuples that have already been
+ * processed and prepared for insertion are not there. We also can't do
+ * it if the table is partitioned.
+ */
+ useHeapMultiInsert = !((resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cstate->partition_dispatch_info != NULL ||
+ cstate->volatile_defexprs);
+
+ if (useHeapMultiInsert)
+ bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
+ /*
+ * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
+ * should do this for COPY, since it's not really an "INSERT" statement as
+ * such. However, executing these triggers maintains consistency with the
+ * EACH ROW triggers that we already fire on COPY.
+ */
+ ExecBSInsertTriggers(estate, resultRelInfo);
+
+ bistate = GetBulkInsertState();
+ econtext = GetPerTupleExprContext(estate);
+
+ /* Set up callback to identify error line number */
+ errcallback.callback = CopyFromErrorCallback;
+ errcallback.arg = (void *) cstate;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Do the work. */
+ values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
for (;;)
{
char *msg;
+ bool skip_tuple;
+ Oid loaded_oid = InvalidOid;
+ int next_cf_state; /* NextCopyFrom return state */
+ TupleTableSlot *slot;
+
+
+ if (nBufferedTuples == 0)
+ {
+ /*
+ * Reset the per-tuple exprcontext. We can only do this if the
+ * tuple buffer is empty. (Calling the context the per-tuple
+ * memory context is a bit of a misnomer now.)
+ */
+ ResetPerTupleExprContext(estate);
+ }
+
+ /* Switch into its memory context */
+ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
CHECK_FOR_INTERRUPTS();
@@ -5221,23 +5406,355 @@ CopyFromBgwMainLoop(Datum main_arg)
elog(LOG, "BGWorker #%d: got zero-length message, stopping", myworkernumber);
break;
}
- // elog(LOG, "BGWorker #%d dummy processing line #%ld", myworkernumber, *(int64 *) data);
- elog(LOG, "BGWorker #%d dummy processing line: %s", myworkernumber, msg);
+ elog(LOG, "BGWorker #%d processing line: %s", myworkernumber, msg);
+
+ cstate->line_buf.data = msg;
+ cstate->line_buf.len = len;
+
+ next_cf_state = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid);
+
+ if (!next_cf_state) {
+ break;
+ }
+ else if (next_cf_state == NCF_SUCCESS)
+ {
+ /* And now we can form the input tuple. */
+ tuple = heap_form_tuple(tupDesc, values, nulls);
+
+ if (loaded_oid != InvalidOid)
+ HeapTupleSetOid(tuple, loaded_oid);
+
+ /*
+ * Constraints might reference the tableoid column, so initialize
+ * t_tableOid before evaluating them.
+ */
+ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+ /* Triggers and stuff need to be invoked in query context. */
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Place tuple in tuple slot --- but slot shouldn't free it */
+ slot = myslot;
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+ /* Determine the partition to heap_insert the tuple into */
+ if (cstate->partition_dispatch_info)
+ {
+ int leaf_part_index;
+ TupleConversionMap *map;
+
+ /*
+ * Away we go ... If we end up not finding a partition after all,
+ * ExecFindPartition() does not return and errors out instead.
+ * Otherwise, the returned value is to be used as an index into
+ * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+ * will get us the ResultRelInfo and TupleConversionMap for the
+ * partition, respectively.
+ */
+ leaf_part_index = ExecFindPartition(resultRelInfo,
+ cstate->partition_dispatch_info,
+ slot,
+ estate);
+ Assert(leaf_part_index >= 0 &&
+ leaf_part_index < cstate->num_partitions);
+
+ /*
+ * If this tuple is mapped to a partition that is not same as the
+ * previous one, we'd better make the bulk insert mechanism gets a
+ * new buffer.
+ */
+ if (prev_leaf_part_index != leaf_part_index)
+ {
+ ReleaseBulkInsertStatePin(bistate);
+ prev_leaf_part_index = leaf_part_index;
+ }
+
+ /*
+ * Save the old ResultRelInfo and switch to the one corresponding
+ * to the selected partition.
+ */
+ saved_resultRelInfo = resultRelInfo;
+ resultRelInfo = cstate->partitions + leaf_part_index;
+
+ /* We do not yet have a way to insert into a foreign partition */
+ if (resultRelInfo->ri_FdwRoutine)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot route inserted tuples to a foreign table")));
+
+ /*
+ * For ExecInsertIndexTuples() to work on the partition's indexes
+ */
+ estate->es_result_relation_info = resultRelInfo;
+
+ /*
+ * If we're capturing transition tuples, we might need to convert
+ * from the partition rowtype to parent rowtype.
+ */
+ if (cstate->transition_capture != NULL)
+ {
+ if (resultRelInfo->ri_TrigDesc &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+ {
+ /*
+ * If there are any BEFORE or INSTEAD triggers on the
+ * partition, we'll have to be ready to convert their
+ * result back to tuplestore format.
+ */
+ cstate->transition_capture->tcs_original_insert_tuple = NULL;
+ cstate->transition_capture->tcs_map =
+ cstate->transition_tupconv_maps[leaf_part_index];
+ }
+ else
+ {
+ /*
+ * Otherwise, just remember the original unconverted
+ * tuple, to avoid a needless round trip conversion.
+ */
+ cstate->transition_capture->tcs_original_insert_tuple = tuple;
+ cstate->transition_capture->tcs_map = NULL;
+ }
+ }
+ /*
+ * We might need to convert from the parent rowtype to the
+ * partition rowtype.
+ */
+ map = cstate->partition_tupconv_maps[leaf_part_index];
+ if (map)
+ {
+ Relation partrel = resultRelInfo->ri_RelationDesc;
+
+ tuple = do_convert_tuple(tuple, map);
+
+ /*
+ * We must use the partition's tuple descriptor from this
+ * point on. Use a dedicated slot from this point on until
+ * we're finished dealing with the partition.
+ */
+ slot = cstate->partition_tuple_slot;
+ Assert(slot != NULL);
+ ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+ ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+ }
+
+ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ }
+
+ skip_tuple = false;
+
+ /* BEFORE ROW INSERT Triggers */
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+ {
+ slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
+
+ if (slot == NULL) /* "do nothing" */
+ skip_tuple = true;
+ else /* trigger might have changed tuple */
+ tuple = ExecMaterializeSlot(slot);
+ }
+ }
+ else
+ {
+ skip_tuple = true;
+ }
+
+ if (!skip_tuple)
+ {
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+ {
+ /* Pass the data to the INSTEAD ROW INSERT trigger */
+ ExecIRInsertTriggers(estate, resultRelInfo, slot);
+ }
+ else
+ {
+ /*
+ * We always check the partition constraint, including when
+ * the tuple got here via tuple-routing. However we don't
+ * need to in the latter case if no BR trigger is defined on
+ * the partition. Note that a BR trigger might modify the
+ * tuple such that the partition constraint is no longer
+ * satisfied, so we need to check in that case.
+ */
+ bool check_partition_constr =
+ (resultRelInfo->ri_PartitionCheck != NIL);
+
+ if (saved_resultRelInfo != NULL &&
+ !(resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row))
+ check_partition_constr = false;
+
+ /* Check the constraints of the tuple */
+ if (cstate->rel->rd_att->constr || check_partition_constr)
+ ExecConstraints(resultRelInfo, slot, estate);
+
+ if (useHeapMultiInsert)
+ {
+ /* Add this tuple to the tuple buffer */
+ if (nBufferedTuples == 0)
+ firstBufferedLineNo = cstate->cur_lineno;
+ bufferedTuples[nBufferedTuples++] = tuple;
+ bufferedTuplesSize += tuple->t_len;
+
+ /*
+ * If the buffer filled up, flush it. Also flush if the
+ * total size of all the tuples in the buffer becomes
+ * large, to avoid using large amounts of memory for the
+ * buffer when the tuples are exceptionally wide.
+ */
+ if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+ bufferedTuplesSize > 65535)
+ {
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples,
+ firstBufferedLineNo);
+ nBufferedTuples = 0;
+ bufferedTuplesSize = 0;
+ }
+ }
+ else
+ {
+ List *recheckIndexes = NIL;
+
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+ hi_options, bistate);
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot,
+ &(tuple->t_self),
+ estate,
+ false,
+ NULL,
+ NIL);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ recheckIndexes, cstate->transition_capture);
+
+ list_free(recheckIndexes);
+ }
+ }
+
+ /*
+ * We count only tuples not suppressed by a BEFORE INSERT trigger;
+ * this is the same definition used by execMain.c for counting
+ * tuples inserted by an INSERT command.
+ */
+ processed++;
+
+ if (saved_resultRelInfo)
+ {
+ resultRelInfo = saved_resultRelInfo;
+ estate->es_result_relation_info = resultRelInfo;
+ }
+ }
}
+ /* Flush any remaining buffered tuples */
+ if (nBufferedTuples > 0)
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples,
+ firstBufferedLineNo);
+
+ /* Done, clean up */
+ error_context_stack = errcallback.previous;
+
+ FreeBulkInsertState(bistate);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * In the old protocol, tell pqcomm that we can process normal protocol
+ * messages again.
+ */
+ if (cstate->copy_dest == COPY_OLD_FE)
+ pq_endmsgread();
+
+ /* Execute AFTER STATEMENT insertion triggers */
+ ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
+
+ /* Handle queued AFTER triggers */
+ AfterTriggerEndQuery(estate);
+
+ ExecResetTupleTable(estate->es_tupleTable, false);
+
+ ExecCloseIndices(resultRelInfo);
+
+ /* Close all the partitioned tables, leaf partitions, and their indices */
+ if (cstate->partition_dispatch_info)
+ {
+ int i;
+
+ /*
+ * Remember cstate->partition_dispatch_info[0] corresponds to the root
+ * partitioned table, which we must not try to close, because it is
+ * the main target table of COPY that will be closed eventually by
+ * DoCopy(). Also, tupslot is NULL for the root partitioned table.
+ */
+ for (i = 1; i < cstate->num_dispatch; i++)
+ {
+ PartitionDispatch pd = cstate->partition_dispatch_info[i];
+
+ heap_close(pd->reldesc, NoLock);
+ ExecDropSingleTupleTableSlot(pd->tupslot);
+ }
+ for (i = 0; i < cstate->num_partitions; i++)
+ {
+ ResultRelInfo *resultRelInfo = cstate->partitions + i;
+
+ ExecCloseIndices(resultRelInfo);
+ heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+ }
+
+ /* Release the standalone partition tuple descriptor */
+ ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
+ }
+
+ /* Close any trigger target relations */
+ ExecCleanUpTriggerState(estate);
+
+ FreeExecutorState(estate);
+
+ /*
+ * If we skipped writing WAL, then we need to sync the heap (but not
+ * indexes since those use WAL anyway)
+ */
+ if (hi_options & HEAP_INSERT_SKIP_WAL)
+ heap_sync(cstate->rel);
+
heap_close(rel, NoLock);
// (is_from ? NoLock : AccessShareLock));
CommitTransactionCommand();
LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
++pst->workers_finished;
+ pst->processed += processed;
+
+ // if (pst->workers_finished == pst->workers_total)
+ // {
+ // registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+ // if (registrant == NULL)
+ // {
+ // elog(DEBUG1, "registrant backend has exited prematurely");
+ // proc_exit(1);
+ // }
+ // SetLatch(&registrant->procLatch);
+ // }
LWLockRelease(CopyFromBgwLock);
/* Signal main process that we are done. */
ConditionVariableBroadcast(&cv);
- pfree(values);
- pfree(nulls);
+ // TODO Segmentation fault:
+ // * frame #0: 0x0000000107519678 postgres`GetMemoryChunkContext(pointer=0x00007fdf268bf858) at memutils.h:124
+ // frame #1: 0x00000001075194e5 postgres`pfree(pointer=0x00007fdf268bf858) at mcxt.c:952
+ // pfree(values);
+ // pfree(nulls);
/*
* We're done. Explicitly detach the shared memory segment so that we
@@ -5307,6 +5824,10 @@ setup_parallel_copy_from(int64 queue_size, int32 nworkers, dsm_segment **segp,
/* Wait for workers to become ready. */
wait_for_workers_to_become_ready(wstate, pst);
+ /* Wait to be signalled. */
+ // WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);
+ /* Reset the latch so we don't spin. */
+ // ResetLatch(MyLatch);
/*
* Once we reach this point, all workers are ready. We no longer need to
@@ -5342,10 +5863,7 @@ setup_dsm(int64 queue_size, int nworkers,
dsm_segment *seg;
shm_toc *toc;
ParallelState *pst;
- char *shm_filename;
- List *shm_p_rtable;
- List *shm_attnamelist;
- List *shm_options;
+ char *shm_query;
Assert(cstate->rel);
@@ -5372,15 +5890,12 @@ setup_dsm(int64 queue_size, int nworkers,
shm_toc_initialize_estimator(&e);
shm_toc_estimate_chunk(&e, sizeof(ParallelState));
- shm_toc_estimate_chunk(&e, sizeof(*pstate->p_rtable));
shm_toc_estimate_chunk(&e, strlen(ActivePortal->sourceText) * sizeof(char));
- // shm_toc_estimate_chunk(&e, sizeof(*attnamelist));
- shm_toc_estimate_chunk(&e, sizeof(*options));
for (i = 0; i < nworkers; ++i)
shm_toc_estimate_chunk(&e, (Size) queue_size);
- shm_toc_estimate_keys(&e, 1 + 4 + nworkers);
+ shm_toc_estimate_keys(&e, 1 + 1 + nworkers);
segsize = shm_toc_estimate(&e);
/* Create the shared memory segment and establish a table of contents. */
@@ -5395,211 +5910,18 @@ setup_dsm(int64 queue_size, int nworkers,
pst->workers_attached = 0;
pst->workers_ready = 0;
pst->workers_finished = 0;
+ pst->processed = 0;
pst->database_id = MyDatabaseId;
pst->authenticated_user_id = GetAuthenticatedUserId();
ConditionVariableInit(&pst->cv);
- pst->is_program = cstate->is_program;
- pst->rel = cstate->rel;
-
- pst->estate = CreateExecutorState(); /* for ExecConstraints() */
- pst->mycid = GetCurrentCommandId(true);
- pst->hi_options = 0; /* start with default heap_insert options */
-
- /*
- * The target must be a plain relation or have an INSTEAD OF INSERT row
- * trigger. (Currently, such triggers are only allowed on views, so we
- * only hint about them in the view case.)
- */
- if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
- cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
- !(cstate->rel->trigdesc &&
- cstate->rel->trigdesc->trig_insert_instead_row))
- {
- if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to view \"%s\"",
- RelationGetRelationName(cstate->rel)),
- errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
- else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to materialized view \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to foreign table \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to sequence \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to non-table relation \"%s\"",
- RelationGetRelationName(cstate->rel))));
- }
-
- pst->tupDesc = RelationGetDescr(cstate->rel);
-
- /*----------
- * Check to see if we can avoid writing WAL
- *
- * If archive logging/streaming is not enabled *and* either
- * - table was created in same transaction as this COPY
- * - data is being written to relfilenode created in this transaction
- * then we can skip writing WAL. It's safe because if the transaction
- * doesn't commit, we'll discard the table (or the new relfilenode file).
- * If it does commit, we'll have done the heap_sync at the bottom of this
- * routine first.
- *
- * As mentioned in comments in utils/rel.h, the in-same-transaction test
- * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
- * can be cleared before the end of the transaction. The exact case is
- * when a relation sets a new relfilenode twice in same transaction, yet
- * the second one fails in an aborted subtransaction, e.g.
- *
- * BEGIN;
- * TRUNCATE t;
- * SAVEPOINT save;
- * TRUNCATE t;
- * ROLLBACK TO save;
- * COPY ...
- *
- * Also, if the target file is new-in-transaction, we assume that checking
- * FSM for free space is a waste of time, even if we must use WAL because
- * of archiving. This could possibly be wrong, but it's unlikely.
- *
- * The comments for heap_insert and RelationGetBufferForTuple specify that
- * skipping WAL logging is only safe if we ensure that our tuples do not
- * go into pages containing tuples from any other transactions --- but this
- * must be the case if we have a new table or new relfilenode, so we need
- * no additional work to enforce that.
- *----------
- */
- /* createSubid is creation check, newRelfilenodeSubid is truncation check */
- if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
- cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
- {
- pst->hi_options |= HEAP_INSERT_SKIP_FSM;
- if (!XLogIsNeeded())
- pst->hi_options |= HEAP_INSERT_SKIP_WAL;
- }
-
- /*
- * Optimize if new relfilenode was created in this subxact or one of its
- * committed children and we won't see those rows later as part of an
- * earlier scan or command. This ensures that if this subtransaction
- * aborts then the frozen rows won't be visible after xact cleanup. Note
- * that the stronger test of exactly which subtransaction created it is
- * crucial for correctness of this optimization.
- */
- if (cstate->freeze)
- {
- if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
- errmsg("cannot perform FREEZE because of prior transaction activity")));
-
- if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
- cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
-
- pst->hi_options |= HEAP_INSERT_FROZEN;
- }
-
- /*
- * We need a ResultRelInfo so we can use the regular executor's
- * index-entry-making machinery. (There used to be a huge amount of code
- * here that basically duplicated execUtils.c ...)
- */
- pst->resultRelInfo = makeNode(ResultRelInfo);
- InitResultRelInfo(pst->resultRelInfo,
- cstate->rel,
- 1, /* dummy rangetable index */
- NULL,
- 0);
-
- ExecOpenIndices(pst->resultRelInfo, false);
-
- pst->estate->es_result_relations = pst->resultRelInfo;
- pst->estate->es_num_result_relations = 1;
- pst->estate->es_result_relation_info = pst->resultRelInfo;
- pst->estate->es_range_table = cstate->range_table;
-
- /* Set up a tuple slot too */
- pst->myslot = ExecInitExtraTupleSlot(pst->estate);
- ExecSetSlotDescriptor(pst->myslot, pst->tupDesc);
- /* Triggers might need a slot as well */
- pst->estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(pst->estate);
-
- /*
- * It's more efficient to prepare a bunch of tuples for insertion, and
- * insert them in one heap_multi_insert() call, than call heap_insert()
- * separately for every tuple. However, we can't do that if there are
- * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
- * expressions. Such triggers or expressions might query the table we're
- * inserting to, and act differently if the tuples that have already been
- * processed and prepared for insertion are not there. We also can't do
- * it if the table is partitioned.
- */
- pst->useHeapMultiInsert = !((pst->resultRelInfo->ri_TrigDesc != NULL &&
- (pst->resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
- pst->resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
- cstate->partition_dispatch_info != NULL ||
- cstate->volatile_defexprs);
-
- /* Prepare to catch AFTER triggers. */
- AfterTriggerBeginQuery();
-
- /*
- * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
- * should do this for COPY, since it's not really an "INSERT" statement as
- * such. However, executing these triggers maintains consistency with the
- * EACH ROW triggers that we already fire on COPY.
- */
- ExecBSInsertTriggers(pst->estate, pst->resultRelInfo);
-
- pst->bistate = GetBulkInsertState();
- pst->econtext = GetPerTupleExprContext(pst->estate);
+ shm_toc_insert(toc, toc_key++, pst);
- /* Set up callback to identify error line number */
- pst->errcallback.callback = CopyFromErrorCallback;
- pst->errcallback.arg = (void *) cstate;
- pst->errcallback.previous = error_context_stack;
- error_context_stack = &pst->errcallback;
+ shm_query = shm_toc_allocate(toc, strlen(ActivePortal->sourceText) * sizeof(char));
- shm_toc_insert(toc, toc_key++, pst);
+ strcpy(shm_query, ActivePortal->sourceText);
- shm_p_rtable = shm_toc_allocate(toc, sizeof(*pstate->p_rtable));
- shm_filename = shm_toc_allocate(toc, strlen(ActivePortal->sourceText) * sizeof(char));
- // shm_attnamelist = shm_toc_allocate(toc, sizeof(*attnamelist));
- shm_options = shm_toc_allocate(toc, sizeof(*options));
-
- *shm_p_rtable = *pstate->p_rtable;
- // *shm_attnamelist = *attnamelist;
- *shm_options = *options;
- strcpy(shm_filename, ActivePortal->sourceText);
-
- shm_toc_insert(toc, toc_key++, shm_p_rtable);
- shm_toc_insert(toc, toc_key++, shm_filename);
- // shm_toc_insert(toc, toc_key++, shm_attnamelist);
- shm_toc_insert(toc, toc_key++, shm_options);
-
- // cst = shm_toc_allocate(toc, sizeof(CopyStateData) + strlen(cstate->filename) * sizeof(char));
- // *cst = *cstate;
- // cst->filename = shm_toc_allocate(toc, strlen(cstate->filename) * sizeof(char));
- // // *cst->filename = *cstate->filename;
- // strcpy(cst->filename, cstate->filename);
- // // memcpy(cst->filename, cstate->filename, sizeof(*cstate->filename));
- // // memcpy(cstate, cst, sizeof(CopyStateData));
- // shm_toc_insert(toc, 1, cst);
+ shm_toc_insert(toc, toc_key++, shm_query);
/* Set up one message queue per worker, plus one. */
for (i = 0; i < nworkers; ++i)
@@ -5807,436 +6129,6 @@ wait_for_workers_to_finish(volatile ParallelState *pst)
ConditionVariableCancelSleep();
}
-// /*
-// * Parse the current line into separate attributes (fields),
-// * performing de-escaping as needed.
-// *
-// * The input is in line_buf. We use attribute_buf to hold the result
-// * strings. cstate->raw_fields[k] is set to point to the k'th attribute
-// * string, or NULL when the input matches the null marker string.
-// * This array is expanded as necessary.
-// *
-// * (Note that the caller cannot check for nulls since the returned
-// * string would be the post-de-escaping equivalent, which may look
-// * the same as some valid data string.)
-// *
-// * delim is the column delimiter string (must be just one byte for now).
-// * null_print is the null marker string. Note that this is compared to
-// * the pre-de-escaped input string.
-// *
-// * The return value is the number of fields actually read.
-// */
-// static int
-// CopyReadTextAttrs(CopyState cstate)
-// {
-// char delimc = cstate->delim[0];
-// int fieldno;
-// char *output_ptr;
-// char *cur_ptr;
-// char *line_end_ptr;
-//
-// /*
-// * We need a special case for zero-column tables: check that the input
-// * line is empty, and return.
-// */
-// if (cstate->max_fields <= 0)
-// {
-// if (cstate->line_buf.len != 0)
-// ereport(ERROR,
-// (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-// errmsg("extra data after last expected column")));
-// return 0;
-// }
-//
-// resetStringInfo(&cstate->attribute_buf);
-//
-// /*
-// * The de-escaped attributes will certainly not be longer than the input
-// * data line, so we can just force attribute_buf to be large enough and
-// * then transfer data without any checks for enough space. We need to do
-// * it this way because enlarging attribute_buf mid-stream would invalidate
-// * pointers already stored into cstate->raw_fields[].
-// */
-// if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
-// enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
-// output_ptr = cstate->attribute_buf.data;
-//
-// /* set pointer variables for loop */
-// cur_ptr = cstate->line_buf.data;
-// line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
-//
-// /* Outer loop iterates over fields */
-// fieldno = 0;
-// for (;;)
-// {
-// bool found_delim = false;
-// char *start_ptr;
-// char *end_ptr;
-// int input_len;
-// bool saw_non_ascii = false;
-//
-// /* Make sure there is enough space for the next value */
-// if (fieldno >= cstate->max_fields)
-// {
-// cstate->max_fields *= 2;
-// cstate->raw_fields =
-// repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
-// }
-//
-// /* Remember start of field on both input and output sides */
-// start_ptr = cur_ptr;
-// cstate->raw_fields[fieldno] = output_ptr;
-//
-// /*
-// * Scan data for field.
-// *
-// * Note that in this loop, we are scanning to locate the end of field
-// * and also speculatively performing de-escaping. Once we find the
-// * end-of-field, we can match the raw field contents against the null
-// * marker string. Only after that comparison fails do we know that
-// * de-escaping is actually the right thing to do; therefore we *must
-// * not* throw any syntax errors before we've done the null-marker
-// * check.
-// */
-// for (;;)
-// {
-// char c;
-//
-// end_ptr = cur_ptr;
-// if (cur_ptr >= line_end_ptr)
-// break;
-// c = *cur_ptr++;
-// if (c == delimc)
-// {
-// found_delim = true;
-// break;
-// }
-// if (c == '\\')
-// {
-// if (cur_ptr >= line_end_ptr)
-// break;
-// c = *cur_ptr++;
-// switch (c)
-// {
-// case '0':
-// case '1':
-// case '2':
-// case '3':
-// case '4':
-// case '5':
-// case '6':
-// case '7':
-// {
-// /* handle \013 */
-// int val;
-//
-// val = OCTVALUE(c);
-// if (cur_ptr < line_end_ptr)
-// {
-// c = *cur_ptr;
-// if (ISOCTAL(c))
-// {
-// cur_ptr++;
-// val = (val << 3) + OCTVALUE(c);
-// if (cur_ptr < line_end_ptr)
-// {
-// c = *cur_ptr;
-// if (ISOCTAL(c))
-// {
-// cur_ptr++;
-// val = (val << 3) + OCTVALUE(c);
-// }
-// }
-// }
-// }
-// c = val & 0377;
-// if (c == '\0' || IS_HIGHBIT_SET(c))
-// saw_non_ascii = true;
-// }
-// break;
-// case 'x':
-// /* Handle \x3F */
-// if (cur_ptr < line_end_ptr)
-// {
-// char hexchar = *cur_ptr;
-//
-// if (isxdigit((unsigned char) hexchar))
-// {
-// int val = GetDecimalFromHex(hexchar);
-//
-// cur_ptr++;
-// if (cur_ptr < line_end_ptr)
-// {
-// hexchar = *cur_ptr;
-// if (isxdigit((unsigned char) hexchar))
-// {
-// cur_ptr++;
-// val = (val << 4) + GetDecimalFromHex(hexchar);
-// }
-// }
-// c = val & 0xff;
-// if (c == '\0' || IS_HIGHBIT_SET(c))
-// saw_non_ascii = true;
-// }
-// }
-// break;
-// case 'b':
-// c = '\b';
-// break;
-// case 'f':
-// c = '\f';
-// break;
-// case 'n':
-// c = '\n';
-// break;
-// case 'r':
-// c = '\r';
-// break;
-// case 't':
-// c = '\t';
-// break;
-// case 'v':
-// c = '\v';
-// break;
-//
-// /*
-// * in all other cases, take the char after '\'
-// * literally
-// */
-// }
-// }
-//
-// /* Add c to output string */
-// *output_ptr++ = c;
-// }
-//
-// /* Check whether raw input matched null marker */
-// input_len = end_ptr - start_ptr;
-// if (input_len == cstate->null_print_len &&
-// strncmp(start_ptr, cstate->null_print, input_len) == 0)
-// cstate->raw_fields[fieldno] = NULL;
-// else
-// {
-// /*
-// * At this point we know the field is supposed to contain data.
-// *
-// * If we de-escaped any non-7-bit-ASCII chars, make sure the
-// * resulting string is valid data for the db encoding.
-// */
-// if (saw_non_ascii)
-// {
-// char *fld = cstate->raw_fields[fieldno];
-//
-// pg_verifymbstr(fld, output_ptr - fld, false);
-// }
-// }
-//
-// /* Terminate attribute value in output area */
-// *output_ptr++ = '\0';
-//
-// fieldno++;
-// /* Done if we hit EOL instead of a delim */
-// if (!found_delim)
-// break;
-// }
-//
-// /* Clean up state of attribute_buf */
-// output_ptr--;
-// Assert(*output_ptr == '\0');
-// cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
-//
-// return fieldno;
-// }
-//
-// /*
-// * Parse the current line into separate attributes (fields),
-// * performing de-escaping as needed. This has exactly the same API as
-// * CopyReadAttributesText, except we parse the fields according to
-// * "standard" (i.e. common) CSV usage.
-// */
-// static int
-// CopyReadCSVAttrs(CopyState cstate)
-// {
-// char delimc = cstate->delim[0];
-// char quotec = cstate->quote[0];
-// char escapec = cstate->escape[0];
-// int fieldno;
-// char *output_ptr;
-// char *cur_ptr;
-// char *line_end_ptr;
-//
-// /*
-// * We need a special case for zero-column tables: check that the input
-// * line is empty, and return.
-// */
-// if (cstate->max_fields <= 0)
-// {
-// if (cstate->line_buf.len != 0)
-// ereport(ERROR,
-// (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-// errmsg("extra data after last expected column")));
-// return 0;
-// }
-//
-// resetStringInfo(&cstate->attribute_buf);
-//
-// /*
-// * The de-escaped attributes will certainly not be longer than the input
-// * data line, so we can just force attribute_buf to be large enough and
-// * then transfer data without any checks for enough space. We need to do
-// * it this way because enlarging attribute_buf mid-stream would invalidate
-// * pointers already stored into cstate->raw_fields[].
-// */
-// if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
-// enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
-// output_ptr = cstate->attribute_buf.data;
-//
-// /* set pointer variables for loop */
-// cur_ptr = cstate->line_buf.data;
-// line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
-//
-// /* Outer loop iterates over fields */
-// fieldno = 0;
-// for (;;)
-// {
-// bool found_delim = false;
-// bool saw_quote = false;
-// char *start_ptr;
-// char *end_ptr;
-// int input_len;
-//
-// /* Make sure there is enough space for the next value */
-// if (fieldno >= cstate->max_fields)
-// {
-// cstate->max_fields *= 2;
-// cstate->raw_fields =
-// repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
-// }
-//
-// /* Remember start of field on both input and output sides */
-// start_ptr = cur_ptr;
-// cstate->raw_fields[fieldno] = output_ptr;
-//
-// /*
-// * Scan data for field,
-// *
-// * The loop starts in "not quote" mode and then toggles between that
-// * and "in quote" mode. The loop exits normally if it is in "not
-// * quote" mode and a delimiter or line end is seen.
-// */
-// for (;;)
-// {
-// char c;
-//
-// /* Not in quote */
-// for (;;)
-// {
-// end_ptr = cur_ptr;
-// if (cur_ptr >= line_end_ptr)
-// goto endfield;
-// c = *cur_ptr++;
-// /* unquoted field delimiter */
-// if (c == delimc)
-// {
-// found_delim = true;
-// goto endfield;
-// }
-// /* start of quoted field (or part of field) */
-// if (c == quotec)
-// {
-// saw_quote = true;
-// break;
-// }
-// /* Add c to output string */
-// *output_ptr++ = c;
-// }
-//
-// /* In quote */
-// for (;;)
-// {
-// end_ptr = cur_ptr;
-// if (cur_ptr >= line_end_ptr)
-// ereport(ERROR,
-// (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-// errmsg("unterminated CSV quoted field")));
-//
-// c = *cur_ptr++;
-//
-// /* escape within a quoted field */
-// if (c == escapec)
-// {
-// /*
-// * peek at the next char if available, and escape it if it
-// * is an escape char or a quote char
-// */
-// if (cur_ptr < line_end_ptr)
-// {
-// char nextc = *cur_ptr;
-//
-// if (nextc == escapec || nextc == quotec)
-// {
-// *output_ptr++ = nextc;
-// cur_ptr++;
-// continue;
-// }
-// }
-// }
-//
-// /*
-// * end of quoted field. Must do this test after testing for
-// * escape in case quote char and escape char are the same
-// * (which is the common case).
-// */
-// if (c == quotec)
-// break;
-//
-// /* Add c to output string */
-// *output_ptr++ = c;
-// }
-// }
-// endfield:
-//
-// /* Terminate attribute value in output area */
-// *output_ptr++ = '\0';
-//
-// /* Check whether raw input matched null marker */
-// input_len = end_ptr - start_ptr;
-// if (!saw_quote && input_len == cstate->null_print_len &&
-// strncmp(start_ptr, cstate->null_print, input_len) == 0)
-// cstate->raw_fields[fieldno] = NULL;
-//
-// fieldno++;
-// /* Done if we hit EOL instead of a delim */
-// if (!found_delim)
-// break;
-// }
-//
-// /* Clean up state of attribute_buf */
-// output_ptr--;
-// Assert(*output_ptr == '\0');
-// cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
-//
-// return fieldno;
-// }
-//
-// /*
-// *
-// */
-// bool
-// CopyFromReadAttributes(char *line, bool csv_mode, char ***fields, int *nfields)
-// {
-// int fldct;
-//
-// /* Parse the line into de-escaped field values */
-// if (csv_mode)
-// fldct = CopyReadCSVAttrs(cstate);
-// else
-// fldct = CopyReadTextAttrs(cstate);
-//
-// *fields = cstate->raw_fields;
-// *nfields = fldct;
-// return true;
-// }
/*
* Parallel Copy FROM file to relation.
@@ -6256,36 +6148,13 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
shm_mq_handle **mq_handles;
shm_mq_result shmq_res;
int last_worker_used = 0;
- // int64 message = 0;
- // char *message;
- // char *message_contents = VARDATA_ANY(message);
- // int message_size = VARSIZE_ANY_EXHDR(message);
int message_size = sizeof(char);
mq_handles = palloc0(sizeof(shm_mq_handle *) * nworkers);
MemoryContext oldcontext = CurrentMemoryContext;
- // TupleDesc tupDesc;
- // ResultRelInfo *resultRelInfo;
- // EState *estate = CreateExecutorState(); /* for ExecConstraints() */
- // ExprContext *econtext;
- // TupleTableSlot *myslot;
- //
- // ErrorContextCallback errcallback;
- // CommandId mycid = GetCurrentCommandId(true);
- // int hi_options = 0; /* start with default heap_insert options */
- // BulkInsertState bistate;
- // bool useHeapMultiInsert;
-
- // ResultRelInfo *saved_resultRelInfo = NULL;
- // int prev_leaf_part_index = -1;
uint64 processed = 0;
- // int nBufferedTuples = 0;
-// #define MAX_BUFFERED_TUPLES 1000
-// HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
-// Size bufferedTuplesSize = 0;
-// int firstBufferedLineNo = 0;
// MQ size = 100 messages x 80 chars each
pst = setup_parallel_copy_from(message_size * 80 * queue_size,
@@ -6297,46 +6166,14 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
attnamelist,
options);
- // // BG Worker startup
- // RegisterDynamicBackgroundWorker(&worker, &bgwhandle);
- // bgwstatus = WaitForBackgroundWorkerStartup(bgwhandle, &bgwpid);
- // elog(LOG, "Main COPY process (pid %d): BGWorker started (pid %d)", MyProcPid, bgwpid);
- // // BG Worker startup
-
elog(LOG, "Copying from file %s", cstate->filename);
for (;;)
{
- // TupleTableSlot *slot;
- // bool skip_tuple;
- // Oid loaded_oid = InvalidOid;
- // int next_cf_state; /* NextCopyFrom return state */
bool done;
CHECK_FOR_INTERRUPTS();
- // if (nBufferedTuples == 0)
- // {
- // /*
- // * Reset the per-tuple exprcontext. We can only do this if the
- // * tuple buffer is empty. (Calling the context the per-tuple
- // * memory context is a bit of a misnomer now.)
- // */
- // ResetPerTupleExprContext(estate);
- // }
-
- /* Switch into its memory context */
- // MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-
- // next_cf_state = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid);
-
-
- // LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
- // // SpinLockAcquire(&pst->mutex);
- // pst->curr_line++;
- // // SpinLockRelease(&pst->mutex);
- // LWLockRelease(CopyFromBgwLock);
-
/* on input just throw the header line away */
if (cstate->cur_lineno == 0 && cstate->header_line)
{
@@ -6349,7 +6186,8 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
/* Actually read the line into memory here */
done = CopyReadLine(cstate);
- // elog(LOG, "Reading line #%d with status %d", cstate->cur_lineno, done);
+ // Sleep 1 second, to be sure, that BGWorkers consume lines, when master is still working.
+ // sleep(1);
/*
* EOF at start of line means we're done. If we see EOF after some
@@ -6373,15 +6211,8 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
}
else
{
- // message = pst->curr_line;
- // cur_ptr = cstate->line_buf.data;
- // line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
- // message = (char *) palloc(cstate->line_buf.len);
- // memcpy(message, cstate->line_buf.data, cstate->line_buf.len);
- // shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], message_size, &message, false);
- // elog(LOG, "Sending line #%d '%s' to BGWorker #%d", cstate->cur_lineno, message, last_worker_used + 1);
elog(LOG, "Sending line #%d to BGWorker #%d", cstate->cur_lineno, last_worker_used + 1);
- // shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, message, false);
+
shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, cstate->line_buf.data, false);
if (shmq_res != SHM_MQ_SUCCESS)
ereport(ERROR,
@@ -6390,323 +6221,21 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
if (last_worker_used == nworkers)
last_worker_used = 0;
}
-
- // if (!next_cf_state) {
- // break;
- // }
- // else if (next_cf_state == NCF_SUCCESS)
- // {
- // /* And now we can form the input tuple. */
- // tuple = heap_form_tuple(tupDesc, values, nulls);
- //
- // if (loaded_oid != InvalidOid)
- // HeapTupleSetOid(tuple, loaded_oid);
- //
- // /*
- // * Constraints might reference the tableoid column, so initialize
- // * t_tableOid before evaluating them.
- // */
- // tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
- //
- // /* Triggers and stuff need to be invoked in query context. */
- // MemoryContextSwitchTo(oldcontext);
- //
- // /* Place tuple in tuple slot --- but slot shouldn't free it */
- // slot = myslot;
- // ExecStoreTuple(tuple, slot, InvalidBuffer, false);
- //
- // /* Determine the partition to heap_insert the tuple into */
- // if (cstate->partition_dispatch_info)
- // {
- // int leaf_part_index;
- // TupleConversionMap *map;
- //
- // /*
- // * Away we go ... If we end up not finding a partition after all,
- // * ExecFindPartition() does not return and errors out instead.
- // * Otherwise, the returned value is to be used as an index into
- // * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
- // * will get us the ResultRelInfo and TupleConversionMap for the
- // * partition, respectively.
- // */
- // leaf_part_index = ExecFindPartition(resultRelInfo,
- // cstate->partition_dispatch_info,
- // slot,
- // estate);
- // Assert(leaf_part_index >= 0 &&
- // leaf_part_index < cstate->num_partitions);
- //
- // /*
- // * If this tuple is mapped to a partition that is not same as the
- // * previous one, we'd better make the bulk insert mechanism gets a
- // * new buffer.
- // */
- // if (prev_leaf_part_index != leaf_part_index)
- // {
- // ReleaseBulkInsertStatePin(bistate);
- // prev_leaf_part_index = leaf_part_index;
- // }
- //
- // /*
- // * Save the old ResultRelInfo and switch to the one corresponding
- // * to the selected partition.
- // */
- // saved_resultRelInfo = resultRelInfo;
- // resultRelInfo = cstate->partitions + leaf_part_index;
- //
- // /* We do not yet have a way to insert into a foreign partition */
- // if (resultRelInfo->ri_FdwRoutine)
- // ereport(ERROR,
- // (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- // errmsg("cannot route inserted tuples to a foreign table")));
- //
- // /*
- // * For ExecInsertIndexTuples() to work on the partition's indexes
- // */
- // estate->es_result_relation_info = resultRelInfo;
- //
- // /*
- // * If we're capturing transition tuples, we might need to convert
- // * from the partition rowtype to parent rowtype.
- // */
- // if (cstate->transition_capture != NULL)
- // {
- // if (resultRelInfo->ri_TrigDesc &&
- // (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
- // resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
- // {
- // /*
- // * If there are any BEFORE or INSTEAD triggers on the
- // * partition, we'll have to be ready to convert their
- // * result back to tuplestore format.
- // */
- // cstate->transition_capture->tcs_original_insert_tuple = NULL;
- // cstate->transition_capture->tcs_map =
- // cstate->transition_tupconv_maps[leaf_part_index];
- // }
- // else
- // {
- // /*
- // * Otherwise, just remember the original unconverted
- // * tuple, to avoid a needless round trip conversion.
- // */
- // cstate->transition_capture->tcs_original_insert_tuple = tuple;
- // cstate->transition_capture->tcs_map = NULL;
- // }
- // }
- // /*
- // * We might need to convert from the parent rowtype to the
- // * partition rowtype.
- // */
- // map = cstate->partition_tupconv_maps[leaf_part_index];
- // if (map)
- // {
- // Relation partrel = resultRelInfo->ri_RelationDesc;
- //
- // tuple = do_convert_tuple(tuple, map);
- //
- // /*
- // * We must use the partition's tuple descriptor from this
- // * point on. Use a dedicated slot from this point on until
- // * we're finished dealing with the partition.
- // */
- // slot = cstate->partition_tuple_slot;
- // Assert(slot != NULL);
- // ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
- // ExecStoreTuple(tuple, slot, InvalidBuffer, true);
- // }
- //
- // tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
- // }
- //
- // skip_tuple = false;
- //
- // /* BEFORE ROW INSERT Triggers */
- // if (resultRelInfo->ri_TrigDesc &&
- // resultRelInfo->ri_TrigDesc->trig_insert_before_row)
- // {
- // slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
- //
- // if (slot == NULL) /* "do nothing" */
- // skip_tuple = true;
- // else /* trigger might have changed tuple */
- // tuple = ExecMaterializeSlot(slot);
- // }
- // }
- // else
- // {
- // skip_tuple = true;
- // }
- //
- // if (!skip_tuple)
- // {
- // if (resultRelInfo->ri_TrigDesc &&
- // resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
- // {
- // /* Pass the data to the INSTEAD ROW INSERT trigger */
- // ExecIRInsertTriggers(estate, resultRelInfo, slot);
- // }
- // else
- // {
- // /*
- // * We always check the partition constraint, including when
- // * the tuple got here via tuple-routing. However we don't
- // * need to in the latter case if no BR trigger is defined on
- // * the partition. Note that a BR trigger might modify the
- // * tuple such that the partition constraint is no longer
- // * satisfied, so we need to check in that case.
- // */
- // bool check_partition_constr =
- // (resultRelInfo->ri_PartitionCheck != NIL);
- //
- // if (saved_resultRelInfo != NULL &&
- // !(resultRelInfo->ri_TrigDesc &&
- // resultRelInfo->ri_TrigDesc->trig_insert_before_row))
- // check_partition_constr = false;
- //
- // /* Check the constraints of the tuple */
- // if (cstate->rel->rd_att->constr || check_partition_constr)
- // ExecConstraints(resultRelInfo, slot, estate);
- //
- // if (useHeapMultiInsert)
- // {
- // /* Add this tuple to the tuple buffer */
- // if (nBufferedTuples == 0)
- // firstBufferedLineNo = cstate->cur_lineno;
- // bufferedTuples[nBufferedTuples++] = tuple;
- // bufferedTuplesSize += tuple->t_len;
- //
- // /*
- // * If the buffer filled up, flush it. Also flush if the
- // * total size of all the tuples in the buffer becomes
- // * large, to avoid using large amounts of memory for the
- // * buffer when the tuples are exceptionally wide.
- // */
- // if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
- // bufferedTuplesSize > 65535)
- // {
- // CopyFromInsertBatch(cstate, estate, mycid, hi_options,
- // resultRelInfo, myslot, bistate,
- // nBufferedTuples, bufferedTuples,
- // firstBufferedLineNo);
- // nBufferedTuples = 0;
- // bufferedTuplesSize = 0;
- // }
- // }
- // else
- // {
- // List *recheckIndexes = NIL;
- //
- // /* OK, store the tuple and create index entries for it */
- // heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
- // hi_options, bistate);
- //
- // if (resultRelInfo->ri_NumIndices > 0)
- // recheckIndexes = ExecInsertIndexTuples(slot,
- // &(tuple->t_self),
- // estate,
- // false,
- // NULL,
- // NIL);
- //
- // /* AFTER ROW INSERT Triggers */
- // ExecARInsertTriggers(estate, resultRelInfo, tuple,
- // recheckIndexes, cstate->transition_capture);
- //
- // list_free(recheckIndexes);
- // }
- // }
- //
- // /*
- // * We count only tuples not suppressed by a BEFORE INSERT trigger;
- // * this is the same definition used by execMain.c for counting
- // * tuples inserted by an INSERT command.
- // */
- // processed++;
- //
- // if (saved_resultRelInfo)
- // {
- // resultRelInfo = saved_resultRelInfo;
- // estate->es_result_relation_info = resultRelInfo;
- // }
- // }
}
- //
- // /* Flush any remaining buffered tuples */
- // if (nBufferedTuples > 0)
- // CopyFromInsertBatch(cstate, estate, mycid, hi_options,
- // resultRelInfo, myslot, bistate,
- // nBufferedTuples, bufferedTuples,
- // firstBufferedLineNo);
- //
- // /* Done, clean up */
- // error_context_stack = errcallback.previous;
- //
- // FreeBulkInsertState(bistate);
/* Wait for all workers to complete their work. */
wait_for_workers_to_finish(pst);
- MemoryContextSwitchTo(oldcontext);
-
- /*
- * In the old protocol, tell pqcomm that we can process normal protocol
- * messages again.
- */
- if (cstate->copy_dest == COPY_OLD_FE)
- pq_endmsgread();
-
- /* Execute AFTER STATEMENT insertion triggers */
- ExecASInsertTriggers(pst->estate, pst->resultRelInfo, cstate->transition_capture);
-
- /* Handle queued AFTER triggers */
- AfterTriggerEndQuery(pst->estate);
+ // /* Wait to be signalled. */
+ // WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);
+ // /* Reset the latch so we don't spin. */
+ // ResetLatch(MyLatch);
- ExecResetTupleTable(pst->estate->es_tupleTable, false);
-
- ExecCloseIndices(pst->resultRelInfo);
-
- /* Close all the partitioned tables, leaf partitions, and their indices */
- if (cstate->partition_dispatch_info)
- {
- int i;
-
- /*
- * Remember cstate->partition_dispatch_info[0] corresponds to the root
- * partitioned table, which we must not try to close, because it is
- * the main target table of COPY that will be closed eventually by
- * DoCopy(). Also, tupslot is NULL for the root partitioned table.
- */
- for (i = 1; i < cstate->num_dispatch; i++)
- {
- PartitionDispatch pd = cstate->partition_dispatch_info[i];
-
- heap_close(pd->reldesc, NoLock);
- ExecDropSingleTupleTableSlot(pd->tupslot);
- }
- for (i = 0; i < cstate->num_partitions; i++)
- {
- ResultRelInfo *resultRelInfo = cstate->partitions + i;
-
- ExecCloseIndices(resultRelInfo);
- heap_close(resultRelInfo->ri_RelationDesc, NoLock);
- }
-
- /* Release the standalone partition tuple descriptor */
- ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
- }
-
- /* Close any trigger target relations */
- ExecCleanUpTriggerState(pst->estate);
-
- FreeExecutorState(pst->estate);
+ LWLockAcquire(CopyFromBgwLock, LW_EXCLUSIVE);
+ processed = pst->processed;
+ LWLockRelease(CopyFromBgwLock);
- /*
- * If we skipped writing WAL, then we need to sync the heap (but not
- * indexes since those use WAL anyway)
- */
- if (pst->hi_options & HEAP_INSERT_SKIP_WAL)
- heap_sync(cstate->rel);
+ MemoryContextSwitchTo(oldcontext);
/* Clean up. */
dsm_detach(seg);
--
2.11.0
From db5271de4d82b0faaeb40abc2f3ee52fd7b7cef3 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Sat, 19 Aug 2017 02:18:36 +0300
Subject: [PATCH 11/13] Per row log outputs commented
---
src/backend/commands/copy.c | 26 ++++++++++++++++++--------
1 file changed, 18 insertions(+), 8 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 083291d1c7..2dec839563 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1048,7 +1048,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
NULL, stmt->attlist, stmt->options);
- if (cstate->allow_parallel) /* copy from file to database */
+ if (cstate->allow_parallel) /* copy from file to database */
+ // if (false)
{
*processed = ParallelCopyFrom(cstate, pstate, rel, stmt->filename, stmt->is_program,
stmt->attlist, stmt->options);
@@ -5406,7 +5407,7 @@ CopyFromBgwMainLoop(Datum main_arg)
elog(LOG, "BGWorker #%d: got zero-length message, stopping", myworkernumber);
break;
}
- elog(LOG, "BGWorker #%d processing line: %s", myworkernumber, msg);
+ // elog(LOG, "BGWorker #%d processing line: %s", myworkernumber, msg);
cstate->line_buf.data = msg;
cstate->line_buf.len = len;
@@ -5747,8 +5748,11 @@ CopyFromBgwMainLoop(Datum main_arg)
// }
LWLockRelease(CopyFromBgwLock);
+ elog(LOG, "BGWorker #%d processed: %lu", myworkernumber, processed);
+
/* Signal main process that we are done. */
- ConditionVariableBroadcast(&cv);
+ // ConditionVariableBroadcast(&cv);
+ SetLatch(&registrant->procLatch);
// TODO Segmentation fault:
// * frame #0: 0x0000000107519678 postgres`GetMemoryChunkContext(pointer=0x00007fdf268bf858) at memutils.h:124
@@ -6120,7 +6124,13 @@ wait_for_workers_to_finish(volatile ParallelState *pst)
elog(LOG, "Going to sleep again");
/* Wait for the workers to wake us up. */
- ConditionVariableSleep(&cv, WAIT_EVENT_COPY_FROM_BGWORKERS_FINISHED);
+ // ConditionVariableSleep(&cv, WAIT_EVENT_COPY_FROM_BGWORKERS_FINISHED);
+
+ /* Wait to be signalled. */
+ WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
/* An interrupt may have occurred while we were waiting. */
CHECK_FOR_INTERRUPTS();
@@ -6144,11 +6154,11 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
ParallelState *pst;
dsm_segment *seg;
int32 nworkers = max_parallel_workers_per_gather;
- int64 queue_size = 100;
+ int64 queue_size = 100000;
shm_mq_handle **mq_handles;
shm_mq_result shmq_res;
int last_worker_used = 0;
- int message_size = sizeof(char);
+ int message_size = sizeof(char) * 80;
mq_handles = palloc0(sizeof(shm_mq_handle *) * nworkers);
@@ -6157,7 +6167,7 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
uint64 processed = 0;
// MQ size = 100 messages x 80 chars each
- pst = setup_parallel_copy_from(message_size * 80 * queue_size,
+ pst = setup_parallel_copy_from(message_size * queue_size,
nworkers,
&seg,
&mq_handles,
@@ -6211,7 +6221,7 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
}
else
{
- elog(LOG, "Sending line #%d to BGWorker #%d", cstate->cur_lineno, last_worker_used + 1);
+ // elog(LOG, "Sending line #%d to BGWorker #%d", cstate->cur_lineno, last_worker_used + 1);
shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, cstate->line_buf.data, false);
if (shmq_res != SHM_MQ_SUCCESS)
--
2.11.0
From 4aca6121dff2e9629c586d7e6df4c8c10aabdc3d Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Sat, 19 Aug 2017 14:54:53 +0300
Subject: [PATCH 12/13] Do not allow parallel COPY FROM for temp tables, since
they are not awailable inside BGWorker process. TODO check solution?
---
src/backend/commands/copy.c | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 2dec839563..de63a2bb4a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -407,8 +407,8 @@ static void setup_dsm(int64 queue_size, int nworkers,
static WorkerState *setup_background_workers(int nworkers,
dsm_segment *seg);
static void cleanup_background_workers(dsm_segment *seg, Datum arg);
-static void wait_for_workers_to_become_ready(WorkerState *wstate,
- volatile ParallelState *pst);
+static void wait_for_workers(WorkerState *wstate,
+ volatile ParallelState *pst);
static bool check_worker_status(WorkerState *wstate);
static void handle_sigterm(SIGNAL_ARGS);
@@ -1480,8 +1480,6 @@ BeginCopy(ParseState *pstate,
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
- cstate->allow_parallel = IsNormalProcessingMode() && IsUnderPostmaster;
-
/*
* We allocate everything used by a cstate in a new memory context. This
* avoids memory leaks during repeated use of COPY in a query.
@@ -3091,6 +3089,10 @@ BeginCopyFrom(ParseState *pstate,
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
+ cstate->allow_parallel = IsNormalProcessingMode()
+ && IsUnderPostmaster
+ && !rel->rd_islocaltemp;
+
/* Set up variables to avoid per-attribute overhead. */
initStringInfo(&cstate->attribute_buf);
initStringInfo(&cstate->line_buf);
@@ -5827,7 +5829,7 @@ setup_parallel_copy_from(int64 queue_size, int32 nworkers, dsm_segment **segp,
}
/* Wait for workers to become ready. */
- wait_for_workers_to_become_ready(wstate, pst);
+ wait_for_workers(wstate, pst);
/* Wait to be signalled. */
// WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);
/* Reset the latch so we don't spin. */
@@ -6033,7 +6035,7 @@ cleanup_background_workers(dsm_segment *seg, Datum arg)
}
static void
-wait_for_workers_to_become_ready(WorkerState *wstate,
+wait_for_workers(WorkerState *wstate,
volatile ParallelState *pst)
{
bool result = false;
@@ -6154,11 +6156,11 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
ParallelState *pst;
dsm_segment *seg;
int32 nworkers = max_parallel_workers_per_gather;
- int64 queue_size = 100000;
shm_mq_handle **mq_handles;
shm_mq_result shmq_res;
int last_worker_used = 0;
- int message_size = sizeof(char) * 80;
+ int message_size = sizeof(char) * 80; // 80 chars each
+ int64 queue_size = 100000; // Number of messages in MQ
mq_handles = palloc0(sizeof(shm_mq_handle *) * nworkers);
@@ -6166,7 +6168,6 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
uint64 processed = 0;
- // MQ size = 100 messages x 80 chars each
pst = setup_parallel_copy_from(message_size * queue_size,
nworkers,
&seg,
--
2.11.0
From c56d117049c597646751d5919c6a9f7d49c5afb0 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Thu, 24 Aug 2017 15:50:38 +0300
Subject: [PATCH 13/13] Do not allow parallel if is not top level
---
src/backend/commands/copy.c | 74 ++++++++++++++++-----------------------------
src/backend/tcop/utility.c | 2 +-
src/include/commands/copy.h | 10 ++----
3 files changed, 30 insertions(+), 56 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index de63a2bb4a..d4b506257d 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -237,8 +237,6 @@ typedef struct CopyStateData
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
-
- bool allow_parallel;
} CopyStateData;
/* DestReceiver for COPY (query) TO */
@@ -392,18 +390,13 @@ static bool CopyGetInt16(CopyState cstate, int16 *val);
static ParallelState* setup_parallel_copy_from(int64 queue_size, int32 nworkers,
- dsm_segment **segp, shm_mq_handle **mq_handles[], CopyState cstate,
- ParseState *pstate,
- List *attnamelist,
- List *options);
+ dsm_segment **segp, shm_mq_handle **mq_handles[],
+ const char *query_string);
static void setup_dsm(int64 queue_size, int nworkers,
dsm_segment **segp,
ParallelState **pstp,
shm_mq **mqs[],
- CopyState cstate,
- ParseState *pstate,
- List *attnamelist,
- List *options);
+ const char *query_string);
static WorkerState *setup_background_workers(int nworkers,
dsm_segment *seg);
static void cleanup_background_workers(dsm_segment *seg, Datum arg);
@@ -855,7 +848,8 @@ CopyLoadRawBuf(CopyState cstate)
void
DoCopy(ParseState *pstate, const CopyStmt *stmt,
int stmt_location, int stmt_len,
- uint64 *processed)
+ uint64 *processed, const char *query_string,
+ bool is_top_level)
{
CopyState cstate;
bool is_from = stmt->is_from;
@@ -1038,6 +1032,11 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
if (is_from)
{
+ bool allow_parallel = IsNormalProcessingMode()
+ && IsUnderPostmaster
+ && !rel->rd_islocaltemp
+ && is_top_level;
+
Assert(rel);
/* check read-only transaction and parallel mode */
@@ -1048,11 +1047,9 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
NULL, stmt->attlist, stmt->options);
- if (cstate->allow_parallel) /* copy from file to database */
- // if (false)
+ if (allow_parallel) /* copy from file to database */
{
- *processed = ParallelCopyFrom(cstate, pstate, rel, stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ *processed = ParallelCopyFrom(cstate, query_string);
}
else
{
@@ -3089,10 +3086,6 @@ BeginCopyFrom(ParseState *pstate,
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
- cstate->allow_parallel = IsNormalProcessingMode()
- && IsUnderPostmaster
- && !rel->rd_islocaltemp;
-
/* Set up variables to avoid per-attribute overhead. */
initStringInfo(&cstate->attribute_buf);
initStringInfo(&cstate->line_buf);
@@ -5133,6 +5126,10 @@ CopyFromBgwMainLoop(Datum main_arg)
BackgroundWorkerInitializeConnectionByOid(pst->database_id,
pst->authenticated_user_id);
+ StartTransactionCommand();
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+
/*
* Set the client encoding to the database encoding, since that is what
* the leader will expect.
@@ -5151,7 +5148,7 @@ CopyFromBgwMainLoop(Datum main_arg)
pstmt = lfirst_node(PlannedStmt, list_head(plantree_list));
cstmnt = (CopyStmt *) pstmt->utilityStmt;
- elog(LOG, "BGWorker #%d filename from CopyStmt: %s", myworkernumber, cstmnt->filename);
+ // elog(LOG, "BGWorker #%d filename from CopyStmt: %s", myworkernumber, cstmnt->filename);
pstate = make_parsestate(NULL);
pstate->p_sourcetext = query_string;
@@ -5186,8 +5183,6 @@ CopyFromBgwMainLoop(Datum main_arg)
registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
SetLatch(&registrant->procLatch);
- StartTransactionCommand();
-
/* Open and lock the relation, using the appropriate lock type. */
rel = heap_openrv(cstmnt->relation, RowExclusiveLock);
// (is_from ? RowExclusiveLock : AccessShareLock));
@@ -5195,6 +5190,8 @@ CopyFromBgwMainLoop(Datum main_arg)
cstate = BeginCopyFrom(pstate, rel, cstmnt->filename, cstmnt->is_program,
NULL, cstmnt->attlist, cstmnt->options);
+ Assert(cstate->rel);
+
estate = CreateExecutorState(); /* for ExecConstraints() */
mycid = GetCurrentCommandId(true);
hi_options = 0; /* start with default heap_insert options */
@@ -5799,10 +5796,7 @@ handle_sigterm(SIGNAL_ARGS)
*/
static ParallelState*
setup_parallel_copy_from(int64 queue_size, int32 nworkers, dsm_segment **segp,
- shm_mq_handle ***mq_handles, CopyState cstate,
- ParseState *pstate,
- List *attnamelist,
- List *options)
+ shm_mq_handle ***mq_handles, const char *query_string)
{
int i;
dsm_segment *seg;
@@ -5813,10 +5807,7 @@ setup_parallel_copy_from(int64 queue_size, int32 nworkers, dsm_segment **segp,
mqs = palloc0(sizeof(shm_mq *) * nworkers);
/* Set up a dynamic shared memory segment. */
- setup_dsm(queue_size, nworkers, &seg, &pst, &mqs, cstate,
- pstate,
- attnamelist,
- options);
+ setup_dsm(queue_size, nworkers, &seg, &pst, &mqs, query_string);
*segp = seg;
/* Register background workers. */
@@ -5857,10 +5848,7 @@ setup_parallel_copy_from(int64 queue_size, int32 nworkers, dsm_segment **segp,
static void
setup_dsm(int64 queue_size, int nworkers,
dsm_segment **segp, ParallelState **pstp,
- shm_mq ***mqs, CopyState cstate,
- ParseState *pstate,
- List *attnamelist,
- List *options)
+ shm_mq ***mqs, const char *query_string)
{
shm_toc_estimator e;
int i;
@@ -5871,8 +5859,6 @@ setup_dsm(int64 queue_size, int nworkers,
ParallelState *pst;
char *shm_query;
- Assert(cstate->rel);
-
/* Ensure a valid queue size. */
if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
ereport(ERROR,
@@ -5896,7 +5882,7 @@ setup_dsm(int64 queue_size, int nworkers,
shm_toc_initialize_estimator(&e);
shm_toc_estimate_chunk(&e, sizeof(ParallelState));
- shm_toc_estimate_chunk(&e, strlen(ActivePortal->sourceText) * sizeof(char));
+ shm_toc_estimate_chunk(&e, strlen(query_string) * sizeof(char));
for (i = 0; i < nworkers; ++i)
shm_toc_estimate_chunk(&e, (Size) queue_size);
@@ -5925,7 +5911,7 @@ setup_dsm(int64 queue_size, int nworkers,
shm_query = shm_toc_allocate(toc, strlen(ActivePortal->sourceText) * sizeof(char));
- strcpy(shm_query, ActivePortal->sourceText);
+ strcpy(shm_query, query_string);
shm_toc_insert(toc, toc_key++, shm_query);
@@ -6146,12 +6132,7 @@ wait_for_workers_to_finish(volatile ParallelState *pst)
* Parallel Copy FROM file to relation.
*/
extern uint64
-ParallelCopyFrom(CopyState cstate, ParseState *pstate,
- Relation rel,
- const char *filename,
- bool is_program,
- List *attnamelist,
- List *options)
+ParallelCopyFrom(CopyState cstate, const char *query_string)
{
ParallelState *pst;
dsm_segment *seg;
@@ -6172,10 +6153,7 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
nworkers,
&seg,
&mq_handles,
- cstate,
- pstate,
- attnamelist,
- options);
+ query_string);
elog(LOG, "Copying from file %s", cstate->filename);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index ddacac8774..11fa7af728 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -559,7 +559,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
DoCopy(pstate, (CopyStmt *) parsetree,
pstmt->stmt_location, pstmt->stmt_len,
- &processed);
+ &processed, queryString, isTopLevel);
if (completionTag)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index dbc769828f..5d5982f4c9 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -26,7 +26,8 @@ typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
- uint64 *processed);
+ uint64 *processed, const char *query_string,
+ bool is_top_level);
extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
@@ -40,12 +41,7 @@ extern void CopyFromErrorCallback(void *arg);
extern void CopyFromBgwMainLoop(Datum main_arg);
extern uint64 CopyFrom(CopyState cstate);
-extern uint64 ParallelCopyFrom(CopyState cstate, ParseState *pstate,
- Relation rel,
- const char *filename,
- bool is_program,
- List *attnamelist,
- List *options);
+extern uint64 ParallelCopyFrom(CopyState cstate, const char *query_string);
extern DestReceiver *CreateCopyDestReceiver(void);
--
2.11.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment