Skip to content

Instantly share code, notes, and snippets.

@arjunsk
Created December 30, 2023 23:24
Show Gist options
  • Save arjunsk/5524ab3bcdce32417fba798aeb1b1de7 to your computer and use it in GitHub Desktop.
Save arjunsk/5524ab3bcdce32417fba798aeb1b1de7 to your computer and use it in GitHub Desktop.
Ways of Building Query Plan
// LogicalPlan is a logical representation of a query. Each LogicalPlan is a
// sub-tree of the query. It is built recursively.
type LogicalPlan struct {
Input *LogicalPlan
// Each LogicalPlan struct must only have one of the following.
SchemaScan *SchemaScan
TableScan *TableScan
Filter *Filter
Distinct *Distinct
Projection *Projection
Aggregation *Aggregation
}
.....
func (p *PhysicalProjectionPushDown) optimize(plan *LogicalPlan, columnsUsedExprs []Expr) {
switch {
case plan.SchemaScan != nil:
plan.SchemaScan.PhysicalProjection = append(p.defaultProjections, columnsUsedExprs...)
case plan.TableScan != nil:
plan.TableScan.PhysicalProjection = append(p.defaultProjections, columnsUsedExprs...)
case plan.Filter != nil:
p.defaultProjections = []Expr{}
columnsUsedExprs = append(columnsUsedExprs, plan.Filter.Expr.ColumnsUsedExprs()...)
case plan.Distinct != nil:
p.defaultProjections = []Expr{}
for _, expr := range plan.Distinct.Exprs {
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...)
}
case plan.Projection != nil:
p.defaultProjections = []Expr{}
for _, expr := range plan.Projection.Exprs {
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...)
}
case plan.Aggregation != nil:
for _, expr := range plan.Aggregation.GroupExprs {
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...)
}
for _, expr := range plan.Aggregation.AggExprs {
columnsUsedExprs = append(columnsUsedExprs, expr.ColumnsUsedExprs()...)
}
p.defaultProjections = []Expr{}
columnsUsedExprs = append(columnsUsedExprs, DynCol(hashedMatch))
}
if plan.Input != nil {
p.optimize(plan.Input, columnsUsedExprs)
}
}
message Plan {
oneof plan {
Query query = 1;
TransationControl tcl = 2;
DataDefinition ddl = 3;
DataControl dcl = 4;
}
int32 try_run_times = 5;
bool is_prepare = 6;
}
message Query {
enum StatementType {
UNKNOWN = 0;
SELECT = 1;
INSERT = 2;
REPLACE = 3;
DELETE = 4;
UPDATE = 5;
MERGE = 6;
}
StatementType stmt_type = 1;
// A query may need to run in steps. This in theory is not
// necessary but often convenient and/or can be better optimized.
// For example, executing non correctlated scalar subquery first
// we can plug the value in the optmizer and the newly available
// value may generate better plan.
// Each step is simply a root node. Root node refers to other
// node as children and the whole step is a DAG.
repeated int32 steps = 2;
// All the nodes. It is OK to have dangle nodes, we only excute nodes
// reachable from step roots.
repeated Node nodes = 3;
// Bound Parameter for the query.
repeated Expr params = 4;
// return head
repeated string headings = 5;
// load Tag
bool loadTag = 6;
}
type QueryBuilder struct {
qry *plan.Query
compCtx CompilerContext
ctxByNode []*BindContext
nameByColRef map[[2]int32]string
tag2Table map[int32]*TableDef
nextTag int32
isPrepareStatement bool
mysqlCompatible bool
haveOnDuplicateKey bool // if it's a plan contain onduplicate key node, we can not use some optmize rule
isForUpdate bool // if it's a query plan for update
deleteNode map[uint64]int32 //delete node in this query. key is tableId, value is the nodeId of sinkScan node in the delete plan
}
// Nodes are Operators
joinMetaAndCentroidsId := builder.appendNode(&plan.Node{
NodeType: plan.Node_JOIN,
JoinType: plan.Node_SINGLE,
Children: []int32{centroidsScanId, metaTableScanId},
ProjectList: joinProjections,
}, bindCtx)
filterCentroidsForCurrVersionId := builder.appendNode(&plan.Node{
NodeType: plan.Node_FILTER,
Children: []int32{joinMetaAndCentroidsId},
FilterList: []*Expr{whereCentroidVersionEqCurrVersion},
ProjectList: prevProjections[:3],
}, bindCtx)
public class Plan
{
private final PlanNode root;
private final TypeProvider types;
private final StatsAndCosts statsAndCosts;
public Plan(PlanNode root, TypeProvider types, StatsAndCosts statsAndCosts)
{
this.root = requireNonNull(root, "root is null");
this.types = requireNonNull(types, "types is null");
this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null");
}
public PlanNode getRoot()
{
return root;
}
public TypeProvider getTypes()
{
return types;
}
public StatsAndCosts getStatsAndCosts()
{
return statsAndCosts;
}
public Map<PlanNodeId, PlanNode> getPlanIdNodeMap()
{
Iterable<PlanNode> planIterator = Traverser.forTree(PlanNode::getSources)
.depthFirstPreOrder(root);
ImmutableMap.Builder<PlanNodeId, PlanNode> planNodeMap = ImmutableMap.builder();
for (PlanNode node : planIterator) {
planNodeMap.put(node.getId(), node);
}
return planNodeMap.build();
}
}
/**
* The basic component of a Presto IR (logic plan).
* An IR is a tree structure with each PlanNode performing a specific operation.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, property = "@type")
public abstract class PlanNode
{
private final Optional<SourceLocation> sourceLocation;
private final PlanNodeId id;
/**
* A statistically equivalent version of plan node, i.e. number of output rows/size remains similar.
* This is assigned by Presto optimizer.
* Once assigned by the planner, further optimizer rules should respect this id when changing the plan.
*
* For example, when doing pushdown: Filter(TableScan()) -> TableScan(),
* output TableScan should have same statsEquivalentPlanNode as input Filter node
*/
private final Optional<PlanNode> statsEquivalentPlanNode;
protected PlanNode(Optional<SourceLocation> sourceLocation, PlanNodeId id, Optional<PlanNode> statsEquivalentPlanNode)
{
this.sourceLocation = sourceLocation;
this.id = requireNonNull(id, "id is null");
this.statsEquivalentPlanNode = requireNonNull(statsEquivalentPlanNode, "statsEquivalentPlanNode is null");
}
@JsonProperty("id")
public PlanNodeId getId()
{
return id;
}
@JsonProperty("sourceLocation")
public Optional<SourceLocation> getSourceLocation()
{
return sourceLocation;
}
public Optional<PlanNode> getStatsEquivalentPlanNode()
{
return statsEquivalentPlanNode;
}
/**
* Get the upstream PlanNodes (i.e., children) of the current PlanNode.
*/
public abstract List<PlanNode> getSources();
/**
* Logical properties are a function of source properties and the operation performed by the plan node
*/
public LogicalProperties computeLogicalProperties(LogicalPropertiesProvider logicalPropertiesProvider)
{
requireNonNull(logicalPropertiesProvider, "logicalPropertiesProvider cannot be null.");
return logicalPropertiesProvider.getDefaultProperties();
}
/**
* The output from the upstream PlanNodes.
* It should serve as the input for the current PlanNode.
*/
public abstract List<VariableReferenceExpression> getOutputVariables();
/**
* Alter the upstream PlanNodes of the current PlanNode.
*/
public abstract PlanNode replaceChildren(List<PlanNode> newChildren);
/**
* A visitor pattern interface to operate on IR.
*/
public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
{
return visitor.visitPlan(this, context);
}
/**
* Assigns statsEquivalentPlanNode to the plan node
*/
public abstract PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode);
}
@Immutable
public final class FilterNode
extends PlanNode
{
private final PlanNode source;
private final RowExpression predicate;
@JsonCreator
public FilterNode(
Optional<SourceLocation> sourceLocation,
@JsonProperty("id") PlanNodeId id,
@JsonProperty("source") PlanNode source,
@JsonProperty("predicate") RowExpression predicate)
{
this(sourceLocation, id, Optional.empty(), source, predicate);
}
public FilterNode(
Optional<SourceLocation> sourceLocation,
PlanNodeId id,
Optional<PlanNode> statsEquivalentPlanNode,
PlanNode source,
RowExpression predicate)
{
super(sourceLocation, id, statsEquivalentPlanNode);
this.source = source;
this.predicate = predicate;
}
/**
* Get the predicate (a RowExpression of boolean type) of the FilterNode.
* It serves as the criteria to determine whether the incoming rows should be filtered out or not.
*/
@JsonProperty
public RowExpression getPredicate()
{
return predicate;
}
/**
* FilterNode only expects a single upstream PlanNode.
*/
@JsonProperty("source")
public PlanNode getSource()
{
return source;
}
@Override
public List<VariableReferenceExpression> getOutputVariables()
{
return source.getOutputVariables();
}
@Override
public List<PlanNode> getSources()
{
return unmodifiableList(singletonList(source));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
{
return visitor.visitFilter(this, context);
}
@Override
public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode)
{
return new FilterNode(getSourceLocation(), getId(), statsEquivalentPlanNode, source, predicate);
}
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren)
{
// FilterNode only expects a single upstream PlanNode
if (newChildren == null || newChildren.size() != 1) {
throw new IllegalArgumentException("Expect exactly one child to replace");
}
return new FilterNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), newChildren.get(0), predicate);
}
@Override
public LogicalProperties computeLogicalProperties(LogicalPropertiesProvider logicalPropertiesProvider)
{
requireNonNull(logicalPropertiesProvider, "logicalPropertiesProvider cannot be null.");
return logicalPropertiesProvider.getFilterProperties(this);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FilterNode that = (FilterNode) o;
return Objects.equals(source, that.source) &&
Objects.equals(predicate, that.predicate);
}
@Override
public int hashCode()
{
return Objects.hash(source, predicate);
}
}
// Plan is the description of an execution flow.
// It is created from ast.Node first, then optimized by the optimizer,
// finally used by the executor to create a Cursor which executes the statement.
type Plan interface {
// Get the schema.
Schema() *expression.Schema
// Get the ID.
ID() int
// TP get the plan type.
TP() string
// Get the ID in explain statement
ExplainID() fmt.Stringer
// ExplainInfo returns operator information to be explained.
ExplainInfo() string
// replaceExprColumns replace all the column reference in the plan's expression node.
replaceExprColumns(replace map[string]*expression.Column)
SCtx() sessionctx.Context
// property.StatsInfo will return the property.StatsInfo for this plan.
statsInfo() *property.StatsInfo
// OutputNames returns the outputting names of each column.
OutputNames() types.NameSlice
// SetOutputNames sets the outputting name by the given slice.
SetOutputNames(names types.NameSlice)
}
// LogicalPlan is a tree of logical operators.
// We can do a lot of logical optimizations to it, like predicate pushdown and column pruning.
type LogicalPlan interface {
Plan
// PredicatePushDown pushes down the predicates in the where/on/having clauses as deeply as possible.
// It will accept a predicate that is an expression slice, and return the expressions that can't be pushed.
// Because it might change the root if the having clause exists, we need to return a plan that represents a new root.
PredicatePushDown([]expression.Expression) ([]expression.Expression, LogicalPlan)
// PruneColumns prunes the unused columns.
PruneColumns([]*expression.Column) error
// findBestTask converts the logical plan to the physical plan. It's a new interface.
// It is called recursively from the parent to the children to create the result physical plan.
// Some logical plans will convert the children to the physical plans in different ways, and return the one
// with the lowest cost.
findBestTask(prop *property.PhysicalProperty) (task, error)
// BuildKeyInfo will collect the information of unique keys into schema.
// Because this method is also used in cascades planner, we cannot use
// things like `p.schema` or `p.children` inside it. We should use the `selfSchema`
// and `childSchema` instead.
BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema)
// pushDownTopN will push down the topN or limit operator during logical optimization.
pushDownTopN(topN *LogicalTopN) LogicalPlan
// recursiveDeriveStats derives statistic info between plans.
recursiveDeriveStats() (*property.StatsInfo, error)
// DeriveStats derives statistic info for current plan node given child stats.
// We need selfSchema, childSchema here because it makes this method can be used in
// cascades planner, where LogicalPlan might not record its children or schema.
DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema) (*property.StatsInfo, error)
// PreparePossibleProperties is only used for join and aggregation. Like group by a,b,c, all permutation of (a,b,c) is
// valid, but the ordered indices in leaf plan is limited. So we can get all possible order properties by a pre-walking.
PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column
// exhaustPhysicalPlans generates all possible plans that can match the required property.
exhaustPhysicalPlans(*property.PhysicalProperty) []PhysicalPlan
// Get all the children.
Children() []LogicalPlan
// SetChildren sets the children for the plan.
SetChildren(...LogicalPlan)
// SetChild sets the ith child for the plan.
SetChild(i int, child LogicalPlan)
}
type baseLogicalPlan struct {
basePlan
taskMap map[string]task
self LogicalPlan
children []LogicalPlan
}
// LogicalSelection represents a where or having predicate.
type LogicalSelection struct {
baseLogicalPlan
// Originally the WHERE or ON condition is parsed into a single expression,
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression
}
// PhysicalPlan is a tree of the physical operators.
type PhysicalPlan interface {
Plan
// attach2Task makes the current physical plan as the father of task's physicalPlan and updates the cost of
// current task. If the child's task is cop task, some operator may close this task and return a new rootTask.
attach2Task(...task) task
// ToPB converts physical plan to tipb executor.
ToPB(ctx sessionctx.Context) (*tipb.Executor, error)
// getChildReqProps gets the required property by child index.
GetChildReqProps(idx int) *property.PhysicalProperty
// StatsCount returns the count of property.StatsInfo for this plan.
StatsCount() float64
// Get all the children.
Children() []PhysicalPlan
// SetChildren sets the children for the plan.
SetChildren(...PhysicalPlan)
// SetChild sets the ith child for the plan.
SetChild(i int, child PhysicalPlan)
// ResolveIndices resolves the indices for columns. After doing this, the columns can evaluate the rows by their indices.
ResolveIndices() error
// Stats returns the StatsInfo of the plan.
Stats() *property.StatsInfo
// ExplainNormalizedInfo returns operator normalized information for generating digest.
ExplainNormalizedInfo() string
}
type basePhysicalPlan struct {
basePlan
childrenReqProps []*property.PhysicalProperty
self PhysicalPlan
children []PhysicalPlan
}
// PhysicalLimit is the physical operator of Limit.
type PhysicalLimit struct {
basePhysicalPlan
Offset uint64
Count uint64
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment