Skip to content

Instantly share code, notes, and snippets.

@sujithjay
Last active February 18, 2018 07:21
Show Gist options
  • Save sujithjay/6f1d012fe221e7b888f6246896af6bff to your computer and use it in GitHub Desktop.
Save sujithjay/6f1d012fe221e7b888f6246896af6bff to your computer and use it in GitHub Desktop.
Spark SQL Joins: Code Snippets
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 010 */   private UnsafeRow filter_result;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 013 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 014 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation bhj_relation;
/* 015 */   private UnsafeRow bhj_result;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 019 */   private UnsafeRow bhj_result1;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder1;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter1;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index; /* This shows that the match happens per partition of the larger Dataset */
/* 029 */     this.inputs = inputs;
/* 030 */     wholestagecodegen_init_0();
/* 031 */     wholestagecodegen_init_1();
/* 032 */
/* 033 */   }
/* 034 */
/* 035 */   private void wholestagecodegen_init_0() {
/* 036 */     inputadapter_input = inputs[0];
/* 037 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 038 */     filter_result = new UnsafeRow(3);
/* 039 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 96);
/* 040 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 3);
/* 041 */     this.bhj_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[1];
/* 042 */     /* The broadcasted table been read as an instance of UnsafeHashedRelation. This happens to be our Department table, since it has a smaller estimated physical size */
/* 043 */     bhj_relation = ((org.apache.spark.sql.execution.joins.UnsafeHashedRelation) bhj_broadcast.value()).asReadOnlyCopy();
/* 044 */     incPeakExecutionMemory(bhj_relation.estimatedSize());
/* 045 */
/* 046 */     bhj_result = new UnsafeRow(1);
/* 047 */     this.bhj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(bhj_result, 32);
/* 048 */     this.bhj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(bhj_holder, 1);
/* 049 */     this.bhj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 050 */     bhj_result1 = new UnsafeRow(5);
/* 051 */     this.bhj_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(bhj_result1, 160);
/* 052 */
/* 053 */   }
/* 054 */
/* 055 */   private void wholestagecodegen_init_1() {
/* 056 */     this.bhj_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(bhj_holder1, 5);
/* 057 */
/* 058 */   }
/* 059 */
/* 060 */   protected void processNext() throws java.io.IOException {
/* 061 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 062 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 063 */       boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
/* 064 */       UTF8String inputadapter_value2 = inputadapter_isNull2 ? null : (inputadapter_row.getUTF8String(2));
/* 065 */
/* 066 */       if (!(!(inputadapter_isNull2))) continue;
/* 067 */
/* 068 */       filter_numOutputRows.add(1);
/* 069 */
/* 070 */       /* generate join key for stream side a.k.a the larger Dataset */
/* 071 */
/* 072 */       bhj_holder.reset();
/* 073 */
/* 074 */       bhj_rowWriter.write(0, inputadapter_value2);
/* 075 */       bhj_result.setTotalSize(bhj_holder.totalSize());
/* 076 */
/* 077 */       /* find matches from HashedRelation */
/* 078 */       UnsafeRow bhj_matched = bhj_result.anyNull() ? null: (UnsafeRow)bhj_relation.getValue(bhj_result);
/* 079 */       if (bhj_matched == null) continue;
/* 080 */
/* 081 */       bhj_numOutputRows.add(1);
/* 082 */
/* 083 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 084 */       UTF8String inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getUTF8String(0));
/* 085 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 086 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1));
/* 087 */       UTF8String bhj_value1 = bhj_matched.getUTF8String(0);
/* 088 */       boolean bhj_isNull2 = bhj_matched.isNullAt(1);
/* 089 */       UTF8String bhj_value2 = bhj_isNull2 ? null : (bhj_matched.getUTF8String(1));
/* 090 */       bhj_holder1.reset();
/* 091 */
/* 092 */       bhj_rowWriter1.zeroOutNullBytes();
/* 093 */
/* 094 */       if (inputadapter_isNull) {
/* 095 */         bhj_rowWriter1.setNullAt(0);
/* 096 */       } else {
/* 097 */         bhj_rowWriter1.write(0, inputadapter_value);
/* 098 */       }
/* 099 */
/* 100 */       if (inputadapter_isNull1) {
/* 101 */         bhj_rowWriter1.setNullAt(1);
/* 102 */       } else {
/* 103 */         bhj_rowWriter1.write(1, inputadapter_value1);
/* 104 */       }
/* 105 */
/* 106 */       bhj_rowWriter1.write(2, inputadapter_value2);
/* 107 */
/* 108 */       bhj_rowWriter1.write(3, bhj_value1);
/* 109 */
/* 110 */       if (bhj_isNull2) {
/* 111 */         bhj_rowWriter1.setNullAt(4);
/* 112 */       } else {
/* 113 */         bhj_rowWriter1.write(4, bhj_value2);
/* 114 */       }
/* 115 */       bhj_result1.setTotalSize(bhj_holder1.totalSize());
/* 116 */       append(bhj_result1);
/* 117 */       if (shouldStop()) return;
/* 118 */     }
/* 119 */   }
/* 120 */ }
scala> case class Employee(id: String, name: String, did: String)
defined class Employee

scala> case class Department(id: String, name: String)
defined class Department

scala> val joe = Employee("1", "Joe", "1")
joe: Employee = Employee(1,Joe,1)

scala> val sam = Employee("2", "Sam", "2")
sam: Employee = Employee(2,Sam,2)

scala> val it = Department("1", "IT")
it: Department = Department(1,IT)

scala> val sales = Department("2", "Sales")
sales: Department = Department(2,Sales)

scala> val employees = spark.createDataset(List(joe, sam))
employees: org.apache.spark.sql.Dataset[Employee] = [id: string, name: string ... 1 more field]

scala> val departments = spark.createDataset(List(it, sales))
departments: org.apache.spark.sql.Dataset[Department] = [id: string, name: string]

scala> val widetable = employees.join(departments, employees("did")===departments("id"))

scala> sc.setLogLevel("TRACE")

scala> widetable.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment