Skip to content

Instantly share code, notes, and snippets.

@sujithjay
Last active June 28, 2018 08:43
Show Gist options
  • Save sujithjay/1a33265eeb3598340722fca3e40fbba2 to your computer and use it in GitHub Desktop.
Save sujithjay/1a33265eeb3598340722fca3e40fbba2 to your computer and use it in GitHub Desktop.
Spark SQL Joins: Code Snippets
scala> widetable.explain()
== Physical Plan ==
*SortMergeJoin [customer_id#2], [customer_id#17], Inner
:- *Sort [customer_id#2 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(customer_id#2, 200)
:     +- *Scan JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1] [product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] ReadSchema: struct<product_id:int,time_id:int,customer_id:int,promotion_id:int,store_id:int,store_sales:decim...
+- *Sort [customer_id#17 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(customer_id#17, 200)
      +- *Scan JDBCRelation(foodmart.customer) [numPartitions=1] [customer_id#17,fullname#45] ReadSchema: struct<customer_id:int,fullname:string>
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, ('s.customer_id = 'c.customer_id)
   :- 'SubqueryAlias s
   :  +- 'UnresolvedRelation `sales_fact_1998`
   +- 'SubqueryAlias c
      +- 'UnresolvedRelation `customer`

== Analyzed Logical Plan ==
product_id: int, time_id: int, customer_id: int, promotion_id: int, store_id: int, store_sales: decimal(10,4), store_cost: decimal(10,4), unit_sales: decimal(10,4), customer_id: int, fullname: string
Project [product_id#0, time_id#1, customer_id#2, promotion_id#3, store_id#4, store_sales#5, store_cost#6, unit_sales#7, customer_id#17, fullname#45]
+- Join Inner, (customer_id#2 = customer_id#17)
   :- SubqueryAlias s
   :  +- SubqueryAlias sales_fact_1998
   :     +- Relation[product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1]
   +- SubqueryAlias c
      +- SubqueryAlias customer
         +- Project [customer_id#17, fullname#45]
            +- Relation[customer_id#17,account_num#18L,lname#19,fname#20,mi#21,address1#22,address2#23,address3#24,address4#25,city#26,state_province#27,postal_code#28,country#29,customer_region_id#30,phone1#31,phone2#32,birthdate#33,marital_status#34,yearly_income#35,gender#36,total_children#37,num_children_at_home#38,education#39,date_accnt_opened#40,member_card#41,occupation#42,houseowner#43,num_cars_owned#44,fullname#45] JDBCRelation(foodmart.customer) [numPartitions=1]

== Optimized Logical Plan ==
Join Inner, (customer_id#2 = customer_id#17)
:- Relation[product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1]
+- Project [customer_id#17, fullname#45]
   +- Relation[customer_id#17,account_num#18L,lname#19,fname#20,mi#21,address1#22,address2#23,address3#24,address4#25,city#26,state_province#27,postal_code#28,country#29,customer_region_id#30,phone1#31,phone2#32,birthdate#33,marital_status#34,yearly_income#35,gender#36,total_children#37,num_children_at_home#38,education#39,date_accnt_opened#40,member_card#41,occupation#42,houseowner#43,num_cars_owned#44,fullname#45] JDBCRelation(foodmart.customer) [numPartitions=1]

== Physical Plan ==
*SortMergeJoin [customer_id#2], [customer_id#17], Inner
:- *Sort [customer_id#2 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(customer_id#2, 200)
:     +- *Scan JDBCRelation(foodmart.sales_fact_1998) [numPartitions=1] [product_id#0,time_id#1,customer_id#2,promotion_id#3,store_id#4,store_sales#5,store_cost#6,unit_sales#7] ReadSchema: struct<product_id:int,time_id:int,customer_id:int,promotion_id:int,store_id:int,store_sales:decim...
+- *Sort [customer_id#17 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(customer_id#17, 200)
      +- *Scan JDBCRelation(foodmart.customer) [numPartitions=1] [customer_id#17,fullname#45] ReadSchema: struct<customer_id:int,fullname:string>
scala> :paste
/* Entering paste mode (ctrl-D to finish) */
val salesFact = spark
  .read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/foodmart")
  .option("user", "username")
  .option("password", "password")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "foodmart.sales_fact_1998")
  .load()
/*Exiting paste mode, now interpreting.*/
salesFact: org.apache.spark.sql.DataFrame = [product_id: int, time_id: int ... 6 more fields]

scala> val customer = spark
  .read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/foodmart")
  .option("user", "username")
  .option("password", "password")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "foodmart.customer")
  .load()
  .select("customer_id", "fullname")
customer: org.apache.spark.sql.DataFrame = [customer_id: int, fullname: string]

scala> salesFact.createOrReplaceTempView("sales_fact_1998")

scala> customer.createOrReplaceTempView("customer")

scala> val widetable = spark.sql("select * from sales_fact_1998 s join customer c on s.customer_id = c.customer_id")
widetable: org.apache.spark.sql.DataFrame = [product_id: int, time_id: int ... 8 more fields]

scala> widetable.explain()
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator smj_leftInput;
/* 009 */   private scala.collection.Iterator smj_rightInput;
/* 010 */   private InternalRow smj_leftRow;
/* 011 */   private InternalRow smj_rightRow;
/* 012 */   private int smj_value2;
/* 013 */   private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 014 */   private int smj_value3;
/* 015 */   private int smj_value4;
/* 016 */   private int smj_value5;
/* 017 */   private int smj_value6;
/* 018 */   private int smj_value7;
/* 019 */   private int smj_value8;
/* 020 */   private Decimal smj_value9;
/* 021 */   private Decimal smj_value10;
/* 022 */   private Decimal smj_value11;
/* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 024 */   private UnsafeRow smj_result;
/* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
/* 027 */
/* 028 */   public GeneratedIterator(Object[] references) {
/* 029 */     this.references = references;
/* 030 */   }
/* 031 */
/* 032 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 033 */     partitionIndex = index;
/* 034 */     this.inputs = inputs;
/* 035 */     smj_leftInput = inputs[0];
/* 036 */     smj_rightInput = inputs[1];
/* 037 */
/* 038 */     smj_rightRow = null;
/* 039 */
/* 040 */     smj_matches = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647);
/* 041 */
/* 042 */     this.smj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 043 */     smj_result = new UnsafeRow(10);
/* 044 */     this.smj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_result, 32);
/* 045 */     this.smj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 10);
/* 046 */
/* 047 */   }
/* 048 */
/* 049 */   private boolean findNextInnerJoinRows(
/* 050 */     scala.collection.Iterator leftIter,
/* 051 */     scala.collection.Iterator rightIter) {
/* 052 */     smj_leftRow = null;
/* 053 */     int comp = 0;
/* 054 */     while (smj_leftRow == null) {
/* 055 */       if (!leftIter.hasNext()) return false;
/* 056 */       smj_leftRow = (InternalRow) leftIter.next();
/* 057 */
/* 058 */       int smj_value = smj_leftRow.getInt(2);
/* 059 */       if (false) {
/* 060 */         smj_leftRow = null;
/* 061 */         continue;
/* 062 */       }
/* 063 */       if (!smj_matches.isEmpty()) {
/* 064 */         comp = 0;
/* 065 */         if (comp == 0) {
/* 066 */           comp = (smj_value > smj_value3 ? 1 : smj_value < smj_value3 ? -1 : 0);
/* 067 */         }
/* 068 */
/* 069 */         if (comp == 0) {
/* 070 */           return true;
/* 071 */         }
/* 072 */         smj_matches.clear();
/* 073 */       }
/* 074 */
/* 075 */       do {
/* 076 */         if (smj_rightRow == null) {
/* 077 */           if (!rightIter.hasNext()) {
/* 078 */             smj_value3 = smj_value;
/* 079 */             return !smj_matches.isEmpty();
/* 080 */           }
/* 081 */           smj_rightRow = (InternalRow) rightIter.next();
/* 082 */
/* 083 */           int smj_value1 = smj_rightRow.getInt(0);
/* 084 */           if (false) {
/* 085 */             smj_rightRow = null;
/* 086 */             continue;
/* 087 */           }
/* 088 */           smj_value2 = smj_value1;
/* 089 */         }
/* 090 */
/* 091 */         comp = 0;
/* 092 */         if (comp == 0) {
/* 093 */           comp = (smj_value > smj_value2 ? 1 : smj_value < smj_value2 ? -1 : 0);
/* 094 */         }
/* 095 */
/* 096 */         if (comp > 0) {
/* 097 */           smj_rightRow = null;
/* 098 */         } else if (comp < 0) {
/* 099 */           if (!smj_matches.isEmpty()) {
/* 100 */             smj_value3 = smj_value;
/* 101 */             return true;
/* 102 */           }
/* 103 */           smj_leftRow = null;
/* 104 */         } else {
/* 105 */           smj_matches.add((UnsafeRow) smj_rightRow);
/* 106 */           smj_rightRow = null;;
/* 107 */         }
/* 108 */       } while (smj_leftRow != null);
/* 109 */     }
/* 110 */     return false; // unreachable
/* 111 */   }
/* 112 */
/* 113 */   protected void processNext() throws java.io.IOException {
/* 114 */     while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 115 */       smj_value4 = smj_leftRow.getInt(0);
/* 116 */       smj_value5 = smj_leftRow.getInt(1);
/* 117 */       smj_value6 = smj_leftRow.getInt(2);
/* 118 */       smj_value7 = smj_leftRow.getInt(3);
/* 119 */       smj_value8 = smj_leftRow.getInt(4);
/* 120 */       smj_value9 = smj_leftRow.getDecimal(5, 10, 4);
/* 121 */       smj_value10 = smj_leftRow.getDecimal(6, 10, 4);
/* 122 */       smj_value11 = smj_leftRow.getDecimal(7, 10, 4);
/* 123 */       scala.collection.Iterator<UnsafeRow> smj_iterator = smj_matches.generateIterator();
/* 124 */       while (smj_iterator.hasNext()) {
/* 125 */         InternalRow smj_rightRow1 = (InternalRow) smj_iterator.next();
/* 126 */
/* 127 */         smj_numOutputRows.add(1);
/* 128 */
/* 129 */         int smj_value12 = smj_rightRow1.getInt(0);
/* 130 */         UTF8String smj_value13 = smj_rightRow1.getUTF8String(1);
/* 131 */         smj_holder.reset();
/* 132 */
/* 133 */         smj_rowWriter.write(0, smj_value4);
/* 134 */
/* 135 */         smj_rowWriter.write(1, smj_value5);
/* 136 */
/* 137 */         smj_rowWriter.write(2, smj_value6);
/* 138 */
/* 139 */         smj_rowWriter.write(3, smj_value7);
/* 140 */
/* 141 */         smj_rowWriter.write(4, smj_value8);
/* 142 */
/* 143 */         smj_rowWriter.write(5, smj_value9, 10, 4);
/* 144 */
/* 145 */         smj_rowWriter.write(6, smj_value10, 10, 4);
/* 146 */
/* 147 */         smj_rowWriter.write(7, smj_value11, 10, 4);
/* 148 */
/* 149 */         smj_rowWriter.write(8, smj_value12);
/* 150 */
/* 151 */         smj_rowWriter.write(9, smj_value13);
/* 152 */         smj_result.setTotalSize(smj_holder.totalSize());
/* 153 */         append(smj_result.copy());
/* 154 */
/* 155 */       }
/* 156 */       if (shouldStop()) return;
/* 157 */     }
/* 158 */   }
/* 159 */ }
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean sort_needToSort;
/* 009 */   private org.apache.spark.sql.execution.SortExec sort_plan;
/* 010 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 011 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 012 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 013 */   private scala.collection.Iterator inputadapter_input;
/* 014 */   private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory;
/* 015 */   private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize;
/* 016 */   private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     sort_needToSort = true;
/* 026 */     this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0];
/* 027 */     sort_sorter = sort_plan.createSorter();
/* 028 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 029 */
/* 030 */     inputadapter_input = inputs[0];
/* 031 */     this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 032 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 033 */     this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 034 */
/* 035 */   }
/* 036 */
/* 037 */   private void sort_addToSorter() throws java.io.IOException {
/* 038 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 039 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 041 */       if (shouldStop()) return;
/* 042 */     }
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   protected void processNext() throws java.io.IOException {
/* 047 */     if (sort_needToSort) {
/* 048 */       long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 049 */       sort_addToSorter();
/* 050 */       sort_sortedIter = sort_sorter.sort();
/* 051 */       sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 052 */       sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 053 */       sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 054 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 055 */       sort_needToSort = false;
/* 056 */     }
/* 057 */
/* 058 */     while (sort_sortedIter.hasNext()) {
/* 059 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 060 */
/* 061 */       append(sort_outputRow);
/* 062 */
/* 063 */       if (shouldStop()) return;
/* 064 */     }
/* 065 */   }
/* 066 */ }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment