Created
July 10, 2023 13:50
-
-
Save vicety/602d2cd6811c91cecafdf78d637fcf1c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import example.udf.UDF; | |
import org.apache.calcite.DataContext; | |
import org.apache.calcite.adapter.enumerable.EnumerableConvention; | |
import org.apache.calcite.adapter.enumerable.EnumerableInterpretable; | |
import org.apache.calcite.adapter.enumerable.EnumerableRel; | |
import org.apache.calcite.adapter.enumerable.EnumerableRules; | |
import org.apache.calcite.adapter.java.JavaTypeFactory; | |
import org.apache.calcite.config.CalciteConnectionConfig; | |
import org.apache.calcite.config.CalciteConnectionConfigImpl; | |
import org.apache.calcite.config.CalciteConnectionProperty; | |
import org.apache.calcite.jdbc.CalciteSchema; | |
import org.apache.calcite.jdbc.JavaTypeFactoryImpl; | |
import org.apache.calcite.linq4j.*; | |
import org.apache.calcite.linq4j.tree.Expression; | |
import org.apache.calcite.plan.*; | |
import org.apache.calcite.plan.volcano.VolcanoPlanner; | |
import org.apache.calcite.prepare.CalciteCatalogReader; | |
import org.apache.calcite.prepare.Prepare; | |
import org.apache.calcite.rel.RelNode; | |
import org.apache.calcite.rel.core.TableModify; | |
import org.apache.calcite.rel.logical.LogicalTableModify; | |
import org.apache.calcite.rel.rules.CoreRules; | |
import org.apache.calcite.rel.type.RelDataType; | |
import org.apache.calcite.rel.type.RelDataTypeFactory; | |
import org.apache.calcite.rex.RexBuilder; | |
import org.apache.calcite.rex.RexNode; | |
import org.apache.calcite.runtime.Bindable; | |
import org.apache.calcite.schema.*; | |
import org.apache.calcite.schema.impl.AbstractTable; | |
import org.apache.calcite.schema.impl.AbstractTableQueryable; | |
import org.apache.calcite.schema.impl.ScalarFunctionImpl; | |
import org.apache.calcite.sql.SqlExplainFormat; | |
import org.apache.calcite.sql.SqlExplainLevel; | |
import org.apache.calcite.sql.SqlNode; | |
import org.apache.calcite.sql.SqlOperatorTable; | |
import org.apache.calcite.sql.fun.SqlStdOperatorTable; | |
import org.apache.calcite.sql.parser.SqlParseException; | |
import org.apache.calcite.sql.parser.SqlParser; | |
import org.apache.calcite.sql.type.SqlTypeName; | |
import org.apache.calcite.sql.util.SqlOperatorTables; | |
import org.apache.calcite.sql.validate.SqlValidator; | |
import org.apache.calcite.sql.validate.SqlValidatorUtil; | |
import org.apache.calcite.sql2rel.SqlToRelConverter; | |
import org.apache.calcite.sql2rel.StandardConvertletTable; | |
import java.lang.reflect.Type; | |
import java.util.*; | |
public class Calcite2 { | |
private static final List<Object[]> BOOK_DATA = new ArrayList<>(Arrays.asList( | |
new Object[]{1, "Les Miserables", 1862, 0}, | |
new Object[]{2, "The Hunchback of Notre-Dame", 1829, 0}, | |
new Object[]{3, "The Last Day of a Condemned Man", 1829, 0}, | |
new Object[]{4, "The three Musketeers", 1844, 1}, | |
new Object[]{5, "The Count of Monte Cristo", 1884, 1} | |
)); | |
public static void main(String[] args) throws SqlParseException { | |
// Instantiate a type factory for creating types (e.g., VARCHAR, NUMERIC, etc.) | |
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); | |
// Create the root schema describing the data model | |
SchemaPlus schema = CalciteSchema.createRootSchema(true).plus(); | |
schema.add("MY_DOUBLE", ScalarFunctionImpl.create(UDF.MyDoubleFunction.class, "eval")); | |
// code for the UDF class | |
// public class UDF { | |
// public abstract static class MyDoubleFunction { | |
// private MyDoubleFunction() { | |
// } | |
// | |
// public static int eval(int x) { | |
// System.out.println("x:" + x); | |
// return x * 2; | |
// } | |
// } | |
// } | |
// Define type for books table | |
RelDataTypeFactory.Builder bookType = new RelDataTypeFactory.Builder(typeFactory); | |
bookType.add("id", SqlTypeName.INTEGER); | |
bookType.add("title", SqlTypeName.VARCHAR); | |
bookType.add("year", SqlTypeName.INTEGER); | |
bookType.add("author", SqlTypeName.INTEGER); | |
Table booksTable = new MPFTable2(bookType.build(), BOOK_DATA); | |
schema.add("book", booksTable); | |
// DQL | |
executeSQL("SELECT b.id, b.author FROM Book b WHERE id > 2 AND author = 1", schema, typeFactory); | |
// DML | |
executeSQL("INSERT INTO Book (id, title, \"year\", author) VALUES (6, 'The Lord of the Rings', 1954, 1)", schema, typeFactory); | |
executeSQL("SELECT b.id, b.author FROM Book b WHERE id > 2 AND author = 1", schema, typeFactory); | |
// UDF | |
executeSQL("SELECT MY_DOUBLE(b.id), b.author FROM Book b WHERE id > 2 AND author = 1", schema, typeFactory); | |
} | |
private static void executeSQL(String sql, SchemaPlus schema, RelDataTypeFactory typeFactory) throws SqlParseException { | |
SqlParser parser = SqlParser.create(sql); | |
// Parse the query into an AST | |
SqlNode sqlNode = parser.parseQuery(); | |
// Configure and instantiate validator | |
Properties props = new Properties(); | |
props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "false"); | |
CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props); | |
CalciteCatalogReader catalogReader = new CalciteCatalogReader(schema.unwrap(CalciteSchema.class), | |
Collections.singletonList(""), | |
typeFactory, config); | |
// NOTE: 新增代码 | |
// 从 org.apache.calcite.prepare.CalcitePrepareImpl.createSqlValidator 抄来的 | |
final SqlOperatorTable opTab0 = SqlStdOperatorTable.instance(); | |
final List<SqlOperatorTable> list = new ArrayList<>(); | |
list.add(opTab0); | |
list.add(catalogReader); | |
final SqlOperatorTable opTab = SqlOperatorTables.chain(list); | |
SqlValidator validator = SqlValidatorUtil.newValidator(opTab, | |
catalogReader, typeFactory, | |
SqlValidator.Config.DEFAULT); | |
// Validate the initial AST | |
SqlNode validNode = validator.validate(sqlNode); | |
// Configure and instantiate the converter of the AST to Logical plan (requires opt cluster) | |
RelOptPlanner planner = new VolcanoPlanner(); | |
planner.addRelTraitDef(ConventionTraitDef.INSTANCE); | |
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)); | |
RelOptTable.ViewExpander NOOP_EXPANDER = (rowType, queryString, schemaPath | |
, viewPath) -> null; | |
SqlToRelConverter relConverter = new SqlToRelConverter( | |
NOOP_EXPANDER, | |
validator, | |
catalogReader, | |
cluster, | |
StandardConvertletTable.INSTANCE, | |
SqlToRelConverter.config()); | |
// Convert the valid AST into a logical plan | |
RelNode logPlan = relConverter.convertQuery(validNode, false, true).rel; | |
// Display the logical plan | |
System.out.println( | |
RelOptUtil.dumpPlan("[Logical plan]", logPlan, SqlExplainFormat.TEXT, | |
SqlExplainLevel.EXPPLAN_ATTRIBUTES)); | |
// ======== OPTIMIZATION ======== | |
planner = cluster.getPlanner(); | |
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE); | |
planner.addRule(CoreRules.PROJECT_TO_CALC); | |
planner.addRule(CoreRules.FILTER_TO_CALC); | |
planner.addRule(EnumerableRules.ENUMERABLE_CALC_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_JOIN_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_LIMIT_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_AGGREGATE_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_UNION_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_MINUS_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_INTERSECT_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_MATCH_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_WINDOW_RULE); | |
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE); | |
// Define the type of the output plan (in this case we want a physical plan in | |
// EnumerableContention) | |
logPlan = planner.changeTraits(logPlan, | |
cluster.traitSet().replace(EnumerableConvention.INSTANCE)); | |
planner.setRoot(logPlan); | |
// Start the optimization process to obtain the most efficient physical plan based on the | |
// provided rule set. | |
EnumerableRel phyPlan = (EnumerableRel) planner.findBestExp(); | |
// Display the physical plan | |
System.out.println( | |
RelOptUtil.dumpPlan("[Physical plan]", phyPlan, SqlExplainFormat.TEXT, | |
SqlExplainLevel.NON_COST_ATTRIBUTES)); | |
// Obtain the executable plan | |
Bindable executablePlan = EnumerableInterpretable.toBindable( | |
new HashMap<>(), | |
null, | |
phyPlan, | |
EnumerableRel.Prefer.ARRAY); | |
// ======== EXECUTION ======== | |
// Run the executable plan using a context simply providing access to the schema | |
for (Object row : executablePlan.bind(new SchemaOnlyDataContext(schema.unwrap(CalciteSchema.class)))) { | |
if (row instanceof Object[]) { | |
System.out.println(Arrays.toString((Object[]) row)); | |
} else { | |
System.out.println(row); | |
} | |
} | |
} | |
private static final class SchemaOnlyDataContext implements DataContext { | |
private final SchemaPlus schema; | |
SchemaOnlyDataContext(CalciteSchema calciteSchema) { | |
this.schema = calciteSchema.plus(); | |
} | |
@Override | |
public SchemaPlus getRootSchema() { | |
return schema; | |
} | |
@Override | |
public JavaTypeFactory getTypeFactory() { | |
return new JavaTypeFactoryImpl(); | |
} | |
@Override | |
public QueryProvider getQueryProvider() { | |
return null; | |
} | |
@Override | |
public Object get(final String name) { | |
return null; | |
} | |
} | |
} | |
class MPFTable2 extends AbstractTable implements ModifiableTable, ProjectableFilterableTable { | |
private List<Object[]> data; | |
private final RelDataType rowType; | |
MPFTable2(RelDataType rowType, List<Object[]> data) { | |
this.data = data; | |
this.rowType = rowType; | |
} | |
@Override | |
public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters, int[] projects) { | |
return new AbstractEnumerable<Object[]>() { | |
public Enumerator<Object[]> enumerator() { | |
return new Enumerator<Object[]>() { | |
int row = -1; | |
Object[] current; | |
public Object[] current() { | |
if (row < 0 || row >= data.size()) { | |
return null; | |
} | |
current = data.get(row); | |
return current; | |
} | |
public boolean moveNext() { | |
return ++row < data.size(); | |
} | |
public void reset() { | |
row = -1; | |
} | |
public void close() { | |
} | |
}; | |
} | |
}; | |
} | |
@Override | |
public Collection getModifiableCollection() { | |
return data; | |
} | |
@Override | |
public TableModify toModificationRel(RelOptCluster cluster, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, TableModify.Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { | |
return LogicalTableModify.create(table, catalogReader, child, operation, | |
updateColumnList, sourceExpressionList, flattened); | |
} | |
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { | |
return rowType; | |
} | |
@Override | |
public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) { | |
return new AbstractTableQueryable<T>(queryProvider, schema, this, | |
tableName) { | |
@Override public Enumerator<T> enumerator() { | |
//noinspection unchecked | |
return (Enumerator<T>) Linq4j.enumerator(data); | |
} | |
}; | |
} | |
@Override | |
public Type getElementType() { | |
return Object[].class; | |
} | |
@Override | |
public Expression getExpression(SchemaPlus schema, String tableName, Class clazz) { | |
return Schemas.tableExpression(schema, getElementType(), tableName, clazz); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The optimization rules I added will actually cause the
asQueryable
to be called whenMPFTable2
is queried at table scan. To implement predicate pushdown, we need Calcite to call thescan
method to complete the query. The method parameter ofscan
includes a modifiable List of filters and projects, if we can do some of them in this scan operation, we can just remove them, so they won't be used in later execution phases.To let calcite use the
scan
method when doing table scan, I used different set of optimization rules if the query is a DQL, see my pull request to Portals project. I commented in the Calcite.java.