Skip to content

Instantly share code, notes, and snippets.

@jyates
Last active June 2, 2016 07:20
Show Gist options
  • Save jyates/f11eb44a44af715b483859f497b9ea89 to your computer and use it in GitHub Desktop.
Save jyates/f11eb44a44af715b483859f497b9ea89 to your computer and use it in GitHub Desktop.
Drill User List - rewriting table for joining logical partitions
package io.fineo.read.drill.exec.store.rel;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.tools.RelBuilder;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList;
public class MyTable extends DynamicDrillTable implements TranslatableTable {
private final SchemaStore schema;
public MyTable(MyPlugin plugin,
String storageEngineName, String userName, Object selection, SchemaStore store) {
super(plugin, storageEngineName, userName, selection);
this.schema = store;
}
@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
LogicalScanBuilder builder = new LogicalScanBuilder(context, relOptTable);
for (String sourcePath : newArrayList("/drill/table1.json", "/drill/table2.json")) {
builder.scan("dfs", sourcePath);
}
return builder.build();
}
private static class LogicalScanBuilder {
private final RelBuilder builder;
private final RelOptTable relOptTable;
private final RelOptCluster cluster;
private int scanCount = 0;
public LogicalScanBuilder(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
this.cluster = context.getCluster();
this.relOptTable = relOptTable;
this.builder = RelBuilder.proto(cluster.getPlanner().getContext())
.create(cluster, relOptTable.getRelOptSchema());
}
/**
* Work around for {@link RelBuilder#scan(String)} not taking multiple String parts as in
* Calcite 1.8. Once Drill bumps up, we can replace with just using that
*/
public LogicalScanBuilder scan(String... schemaAndTable) {
RelOptTable table =
relOptTable.getRelOptSchema().getTableForMember(newArrayList(schemaAndTable));
LogicalTableScan scan =
new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), table);
builder.push(scan);
scanCount++;
return this;
}
public RelNode build() {
// join all the sub-tables together on the common keys
for (int i = 0; i < scanCount - 1; i++) {
// ideally do:
// builder.join(JoinRelType.FULL, "id")
// but seem to have to make our own version:
builder.join(JoinRelType.FULL, composeCondition("id"));
}
return builder.build();
}
private RexNode composeCondition(String... fieldNames) {
RelNode table1 = builder.peek(0);
RelNode table2 = builder.peek(1);
// build the rex node for the two tables
final List<RexNode> conditions = new ArrayList<>();
for (String fieldName : fieldNames) {
conditions.add(
builder.call(SqlStdOperatorTable.EQUALS,
field(table1, fieldName, 0),
field(table2, fieldName, 1)));
}
return RexUtil.composeConjunction(cluster.getRexBuilder(), conditions, false);
}
private RexNode field(RelNode table, String fieldName, int offset) {
RelDataType row = table.getRowType();
RelDataTypeField field = row.getField(fieldName, true, false);
int index = field.getIndex();
return cluster.getRexBuilder().makeInputRef(row, index + offset);
}
}
}
2016-06-02 00:02:00 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:Directory Prune Planning (7ms):
LogicalProject(*=[$0]): rowcount = 1500.0, cumulative cost = {3200.0 rows, 1702.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 16
LogicalJoin(condition=[=($0, $1)], joinType=[inner]): rowcount = 1500.0, cumulative cost = {1700.0 rows, 202.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14
LogicalTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3
LogicalTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2
2016-06-02 00:02:00 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:LOPT Join Planning (17ms):
DrillProjectRel(*=[$0]): rowcount = 1.0, cumulative cost = {4.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 55
DrillJoinRel(condition=[=($0, $1)], joinType=[inner]): rowcount = 1.0, cumulative cost = {4.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 60
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 42
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 41
2016-06-02 00:02:00 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[151] - Drill Physical:
00-00 Screen : rowType = RecordType(ANY *): rowcount = 1.0, cumulative cost = {6.1 rows, 10.1 cpu, 0.0 io, 0.0 network, 16.0 memory}, id = 624
00-01 Project(*=[$0]) : rowType = RecordType(ANY *): rowcount = 1.0, cumulative cost = {6.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 16.0 memory}, id = 623
00-02 MergeJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY id, ANY id0): rowcount = 1.0, cumulative cost = {6.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 16.0 memory}, id = 622
00-04 SelectionVectorRemover : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 617
00-06 Sort(sort0=[$0], dir0=[ASC]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 616
00-08 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-2-513a7a13-0950-42c6-8265-765472451ff4.json]]]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 615
00-03 Project(id0=[$0]) : rowType = RecordType(ANY id0): rowcount = 1.0, cumulative cost = {2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 621
00-05 SelectionVectorRemover : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 620
00-07 Sort(sort0=[$0], dir0=[ASC]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 8.0 memory}, id = 619
00-09 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit4857731164319877248/drill/test-1-f49716af-48c4-4338-9a0c-5155b6a0548a.json]]]) : rowType = (DrillRecordRow[id]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 618
2016-06-02 00:09:29 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:Directory Prune Planning (6ms):
LogicalProject(*=[$0], *0=[$2]): rowcount = 1500.0, cumulative cost = {3200.0 rows, 3202.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 19
LogicalJoin(condition=[=($1, $3)], joinType=[full]): rowcount = 1500.0, cumulative cost = {1700.0 rows, 202.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 17
EnumerableTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 5
EnumerableTableScan(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 6
2016-06-02 00:09:29 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[140] - HEP_BOTTOM_UP:LOPT Join Planning (14ms):
DrillProjectRel(*=[$0], *0=[$2]): rowcount = 1.0, cumulative cost = {4.0 rows, 20020.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 54
DrillJoinRel(condition=[=($1, $3)], joinType=[full]): rowcount = 1.0, cumulative cost = {4.0 rows, 20020.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 59
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-1-0728a834-81bd-48f1-9f2e-dc3594d46eda.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 10000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 41
DrillScanRel(table=[[dfs, /var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json]], groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit6653229530728832749/drill/test-2-e0b457ae-798a-4fe7-8774-4d7286d18822.json]]]): rowcount = 1.0, cumulative cost = {1.0 rows, 10000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 40
2016-06-02 00:03:50 DEBUG o.a.d.e.p.s.h.DefaultSqlHandler[151] - Drill Physical:
00-00 Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 1.0, cumulative cost = {2.1 rows, 20.1 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 325
00-01 ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 1.0, cumulative cost = {2.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 324
00-02 Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 1.0, cumulative cost = {2.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 323
00-03 HashJoin(condition=[=($1, $3)], joinType=[full]) : rowType = RecordType(ANY T0¦¦*, ANY companykey, ANY T1¦¦*, ANY companykey0): rowcount = 1.0, cumulative cost = {2.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 17.6 memory}, id = 322
00-05 Project(T0¦¦*=[$0], companykey=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY companykey): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 318
00-07 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-1-bbebfbe4-baf0-4dc2-ae79-fae61c0da3fc.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-1-bbebfbe4-baf0-4dc2-ae79-fae61c0da3fc.json]]]) : rowType = (DrillRecordRow[*, companykey]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 317
00-04 Project(T1¦¦*=[$0], companykey0=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY companykey0): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 321
00-06 Project(T1¦¦*=[$0], companykey=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY companykey): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 320
00-08 Scan(groupscan=[EasyGroupScan [selectionRoot=file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-2-7ce87e1f-9709-4417-88a9-6475e3a2517e.json, numFiles=1, columns=[`*`], files=[file:/var/folders/43/tsp1ph8n5b96whkk0j_bkl540000gn/T/junit7587719243224120904/drill/test-2-7ce87e1f-9709-4417-88a9-6475e3a2517e.json]]]) : rowType = (DrillRecordRow[*, companykey]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 319
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment