Skip to content

Instantly share code, notes, and snippets.

@vicety
Created July 10, 2023 13:50
Show Gist options
  • Save vicety/602d2cd6811c91cecafdf78d637fcf1c to your computer and use it in GitHub Desktop.
Save vicety/602d2cd6811c91cecafdf78d637fcf1c to your computer and use it in GitHub Desktop.
import example.udf.UDF;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRules;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.*;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.*;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.Bindable;
import org.apache.calcite.schema.*;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.schema.impl.AbstractTableQueryable;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.sql2rel.StandardConvertletTable;
import java.lang.reflect.Type;
import java.util.*;
public class Calcite2 {
private static final List<Object[]> BOOK_DATA = new ArrayList<>(Arrays.asList(
new Object[]{1, "Les Miserables", 1862, 0},
new Object[]{2, "The Hunchback of Notre-Dame", 1829, 0},
new Object[]{3, "The Last Day of a Condemned Man", 1829, 0},
new Object[]{4, "The three Musketeers", 1844, 1},
new Object[]{5, "The Count of Monte Cristo", 1884, 1}
));
public static void main(String[] args) throws SqlParseException {
// Instantiate a type factory for creating types (e.g., VARCHAR, NUMERIC, etc.)
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
// Create the root schema describing the data model
SchemaPlus schema = CalciteSchema.createRootSchema(true).plus();
schema.add("MY_DOUBLE", ScalarFunctionImpl.create(UDF.MyDoubleFunction.class, "eval"));
// code for the UDF class
// public class UDF {
// public abstract static class MyDoubleFunction {
// private MyDoubleFunction() {
// }
//
// public static int eval(int x) {
// System.out.println("x:" + x);
// return x * 2;
// }
// }
// }
// Define type for books table
RelDataTypeFactory.Builder bookType = new RelDataTypeFactory.Builder(typeFactory);
bookType.add("id", SqlTypeName.INTEGER);
bookType.add("title", SqlTypeName.VARCHAR);
bookType.add("year", SqlTypeName.INTEGER);
bookType.add("author", SqlTypeName.INTEGER);
Table booksTable = new MPFTable2(bookType.build(), BOOK_DATA);
schema.add("book", booksTable);
// DQL
executeSQL("SELECT b.id, b.author FROM Book b WHERE id > 2 AND author = 1", schema, typeFactory);
// DML
executeSQL("INSERT INTO Book (id, title, \"year\", author) VALUES (6, 'The Lord of the Rings', 1954, 1)", schema, typeFactory);
executeSQL("SELECT b.id, b.author FROM Book b WHERE id > 2 AND author = 1", schema, typeFactory);
// UDF
executeSQL("SELECT MY_DOUBLE(b.id), b.author FROM Book b WHERE id > 2 AND author = 1", schema, typeFactory);
}
private static void executeSQL(String sql, SchemaPlus schema, RelDataTypeFactory typeFactory) throws SqlParseException {
SqlParser parser = SqlParser.create(sql);
// Parse the query into an AST
SqlNode sqlNode = parser.parseQuery();
// Configure and instantiate validator
Properties props = new Properties();
props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "false");
CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props);
CalciteCatalogReader catalogReader = new CalciteCatalogReader(schema.unwrap(CalciteSchema.class),
Collections.singletonList(""),
typeFactory, config);
// NOTE: 新增代码
// 从 org.apache.calcite.prepare.CalcitePrepareImpl.createSqlValidator 抄来的
final SqlOperatorTable opTab0 = SqlStdOperatorTable.instance();
final List<SqlOperatorTable> list = new ArrayList<>();
list.add(opTab0);
list.add(catalogReader);
final SqlOperatorTable opTab = SqlOperatorTables.chain(list);
SqlValidator validator = SqlValidatorUtil.newValidator(opTab,
catalogReader, typeFactory,
SqlValidator.Config.DEFAULT);
// Validate the initial AST
SqlNode validNode = validator.validate(sqlNode);
// Configure and instantiate the converter of the AST to Logical plan (requires opt cluster)
RelOptPlanner planner = new VolcanoPlanner();
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
RelOptTable.ViewExpander NOOP_EXPANDER = (rowType, queryString, schemaPath
, viewPath) -> null;
SqlToRelConverter relConverter = new SqlToRelConverter(
NOOP_EXPANDER,
validator,
catalogReader,
cluster,
StandardConvertletTable.INSTANCE,
SqlToRelConverter.config());
// Convert the valid AST into a logical plan
RelNode logPlan = relConverter.convertQuery(validNode, false, true).rel;
// Display the logical plan
System.out.println(
RelOptUtil.dumpPlan("[Logical plan]", logPlan, SqlExplainFormat.TEXT,
SqlExplainLevel.EXPPLAN_ATTRIBUTES));
// ======== OPTIMIZATION ========
planner = cluster.getPlanner();
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
planner.addRule(CoreRules.PROJECT_TO_CALC);
planner.addRule(CoreRules.FILTER_TO_CALC);
planner.addRule(EnumerableRules.ENUMERABLE_CALC_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_JOIN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_LIMIT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_UNION_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_MINUS_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_INTERSECT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_MATCH_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_WINDOW_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE);
// Define the type of the output plan (in this case we want a physical plan in
// EnumerableContention)
logPlan = planner.changeTraits(logPlan,
cluster.traitSet().replace(EnumerableConvention.INSTANCE));
planner.setRoot(logPlan);
// Start the optimization process to obtain the most efficient physical plan based on the
// provided rule set.
EnumerableRel phyPlan = (EnumerableRel) planner.findBestExp();
// Display the physical plan
System.out.println(
RelOptUtil.dumpPlan("[Physical plan]", phyPlan, SqlExplainFormat.TEXT,
SqlExplainLevel.NON_COST_ATTRIBUTES));
// Obtain the executable plan
Bindable executablePlan = EnumerableInterpretable.toBindable(
new HashMap<>(),
null,
phyPlan,
EnumerableRel.Prefer.ARRAY);
// ======== EXECUTION ========
// Run the executable plan using a context simply providing access to the schema
for (Object row : executablePlan.bind(new SchemaOnlyDataContext(schema.unwrap(CalciteSchema.class)))) {
if (row instanceof Object[]) {
System.out.println(Arrays.toString((Object[]) row));
} else {
System.out.println(row);
}
}
}
private static final class SchemaOnlyDataContext implements DataContext {
private final SchemaPlus schema;
SchemaOnlyDataContext(CalciteSchema calciteSchema) {
this.schema = calciteSchema.plus();
}
@Override
public SchemaPlus getRootSchema() {
return schema;
}
@Override
public JavaTypeFactory getTypeFactory() {
return new JavaTypeFactoryImpl();
}
@Override
public QueryProvider getQueryProvider() {
return null;
}
@Override
public Object get(final String name) {
return null;
}
}
}
class MPFTable2 extends AbstractTable implements ModifiableTable, ProjectableFilterableTable {
private List<Object[]> data;
private final RelDataType rowType;
MPFTable2(RelDataType rowType, List<Object[]> data) {
this.data = data;
this.rowType = rowType;
}
@Override
public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters, int[] projects) {
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new Enumerator<Object[]>() {
int row = -1;
Object[] current;
public Object[] current() {
if (row < 0 || row >= data.size()) {
return null;
}
current = data.get(row);
return current;
}
public boolean moveNext() {
return ++row < data.size();
}
public void reset() {
row = -1;
}
public void close() {
}
};
}
};
}
@Override
public Collection getModifiableCollection() {
return data;
}
@Override
public TableModify toModificationRel(RelOptCluster cluster, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, TableModify.Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
return LogicalTableModify.create(table, catalogReader, child, operation,
updateColumnList, sourceExpressionList, flattened);
}
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
return rowType;
}
@Override
public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) {
return new AbstractTableQueryable<T>(queryProvider, schema, this,
tableName) {
@Override public Enumerator<T> enumerator() {
//noinspection unchecked
return (Enumerator<T>) Linq4j.enumerator(data);
}
};
}
@Override
public Type getElementType() {
return Object[].class;
}
@Override
public Expression getExpression(SchemaPlus schema, String tableName, Class clazz) {
return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
}
}
@vicety
Copy link
Author

vicety commented Jul 10, 2023

The optimization rules I added will actually cause the asQueryable to be called when MPFTable2 is queried at table scan. To implement predicate pushdown, we need Calcite to call the scan method to complete the query. The method parameter of scan includes a modifiable List of filters and projects, if we can do some of them in this scan operation, we can just remove them, so they won't be used in later execution phases.

To let calcite use the scan method when doing table scan, I used different set of optimization rules if the query is a DQL, see my pull request to Portals project. I commented in the Calcite.java.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment