Skip to content

Instantly share code, notes, and snippets.

@onderkalaci
Created October 7, 2016 08:21
Show Gist options
  • Save onderkalaci/72263afdb8280607142e3168ab7d50e0 to your computer and use it in GitHub Desktop.
Save onderkalaci/72263afdb8280607142e3168ab7d50e0 to your computer and use it in GitHub Desktop.
diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c
index 58ffdb3..1202b18 100644
--- a/src/backend/distributed/executor/multi_router_executor.c
+++ b/src/backend/distributed/executor/multi_router_executor.c
@@ -385,14 +385,28 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
Query *query = workerJob->jobQuery;
Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid;
- ExecuteMasterEvaluableFunctions(query);
+ //ExecuteMasterEvaluableFunctions(query);
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
StringInfo newQueryString = makeStringInfo();
- deparse_shard_query(query, relationId, task->anchorShardId,
+ ExecuteMasterEvaluableFunctions(task->taskQuery);
+
+ /* recursively do same addition for subqueries of this query */
+ List *subqueryEntryList = SubqueryEntryList(task->taskQuery);
+ ListCell *rangeTableEntryCell = NULL;
+ foreach(rangeTableEntryCell, subqueryEntryList)
+ {
+ RangeTblEntry *rangeTableEntry =
+ (RangeTblEntry *) lfirst(rangeTableEntryCell);
+
+ Query *innerSubquery = rangeTableEntry->subquery;
+ ExecuteMasterEvaluableFunctions(innerSubquery);
+ }
+
+ deparse_shard_query(task->taskQuery, relationId, task->anchorShardId,
newQueryString);
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c
index f6e9a69..4beaab9 100644
--- a/src/backend/distributed/planner/multi_router_planner.c
+++ b/src/backend/distributed/planner/multi_router_planner.c
@@ -305,6 +305,7 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
sqlTask->dependedTaskList = NULL;
sqlTask->anchorShardId = shardId;
sqlTask->taskPlacementList = insertShardPlacementList;
+ sqlTask->taskQuery = copyObject(copiedOriginal);
sqlTaskList = lappend(sqlTaskList, sqlTask);
}
@@ -317,7 +318,7 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
workerJob->dependedJobList = NIL;
workerJob->jobId = jobId;
workerJob->jobQuery = originalQuery;
- workerJob->requiresMasterEvaluation = false; /* for now we do not support any function evaluation */
+ workerJob->requiresMasterEvaluation = true; /* for now we do not support any function evaluation */
/* and finally the multi plan */
multiPlan = CitusMakeNode(MultiPlan);
@@ -354,7 +355,7 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree)
if (contain_mutable_functions((Node *) queryTree))
{
- ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ ereport(INFO, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("non-IMMUTABLE functions are not allowed in INSERT ... "
"SELECT queries")));
}
diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c
index dda8b80..d34b524 100644
--- a/src/backend/distributed/utils/citus_outfuncs.c
+++ b/src/backend/distributed/utils/citus_outfuncs.c
@@ -459,6 +459,7 @@ _outTask(StringInfo str, const Task *node)
WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
+ WRITE_NODE_FIELD(taskQuery);
}
diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c
index 13884aa..e276994 100644
--- a/src/backend/distributed/utils/citus_readfuncs_95.c
+++ b/src/backend/distributed/utils/citus_readfuncs_95.c
@@ -1508,6 +1508,7 @@ _readTask(void)
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
+ READ_NODE_FIELD(taskQuery);
READ_DONE();
}
diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h
index ab49167..e2489f9 100644
--- a/src/include/distributed/multi_physical_planner.h
+++ b/src/include/distributed/multi_physical_planner.h
@@ -169,6 +169,8 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
+
+ Query *taskQuery; /* only applies to INSERT .. SELECT tasks */
} Task;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment