Skip to content

Instantly share code, notes, and snippets.

@c21
Created October 13, 2021 08:07
Show Gist options
  • Save c21/4b59752c1f3f98303b60ccff66b5db69 to your computer and use it in GitHub Desktop.
Save c21/4b59752c1f3f98303b60ccff66b5db69 to your computer and use it in GitHub Desktop.
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:266; maxConstantPoolSize:309(0.47% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#57], functions=[partial_avg(value#58)], output=[key#57, sum#65, count#66L])
+- *(1) ColumnarToRow
+- FileScan parquet default.agg1[key#57,value#58] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:int>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg_0;
/* 010 */ private boolean agg_bufIsNull_0;
/* 011 */ private double agg_bufValue_0;
/* 012 */ private boolean agg_bufIsNull_1;
/* 013 */ private long agg_bufValue_1;
/* 014 */ private agg_FastHashMap_0 agg_fastHashMap_0;
/* 015 */ private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter_0;
/* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
/* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
/* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
/* 019 */ private int columnartorow_batchIdx_0;
/* 020 */ private boolean agg_agg_isNull_8_0;
/* 021 */ private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] columnartorow_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[2];
/* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] columnartorow_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 023 */ private org.apache.spark.sql.vectorized.ColumnarBatch[] columnartorow_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1];
/* 024 */ private scala.collection.Iterator[] columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
/* 025 */
/* 026 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 027 */ this.references = references;
/* 028 */ }
/* 029 */
/* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */
/* 034 */ columnartorow_mutableStateArray_0[0] = inputs[0];
/* 035 */ columnartorow_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 036 */ columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 037 */ columnartorow_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 038 */ columnartorow_mutableStateArray_3[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 039 */
/* 040 */ }
/* 041 */
/* 042 */ public class agg_FastHashMap_0 {
/* 043 */ private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 044 */ private int[] buckets;
/* 045 */ private int capacity = 1 << 16;
/* 046 */ private double loadFactor = 0.5;
/* 047 */ private int numBuckets = (int) (capacity / loadFactor);
/* 048 */ private int maxSteps = 2;
/* 049 */ private int numRows = 0;
/* 050 */ private Object emptyVBase;
/* 051 */ private long emptyVOff;
/* 052 */ private int emptyVLen;
/* 053 */ private boolean isBatchFull = false;
/* 054 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 055 */
/* 056 */ public agg_FastHashMap_0(
/* 057 */ org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 058 */ InternalRow emptyAggregationBuffer) {
/* 059 */ batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 060 */ .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
/* 061 */
/* 062 */ final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
/* 063 */ final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 064 */
/* 065 */ emptyVBase = emptyBuffer;
/* 066 */ emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 067 */ emptyVLen = emptyBuffer.length;
/* 068 */
/* 069 */ agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 070 */ 1, 0);
/* 071 */
/* 072 */ buckets = new int[numBuckets];
/* 073 */ java.util.Arrays.fill(buckets, -1);
/* 074 */ }
/* 075 */
/* 076 */ public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(int agg_key_0) {
/* 077 */ long h = hash(agg_key_0);
/* 078 */ int step = 0;
/* 079 */ int idx = (int) h & (numBuckets - 1);
/* 080 */ while (step < maxSteps) {
/* 081 */ // Return bucket index if it's either an empty slot or already contains the key
/* 082 */ if (buckets[idx] == -1) {
/* 083 */ if (numRows < capacity && !isBatchFull) {
/* 084 */ agg_rowWriter.reset();
/* 085 */ agg_rowWriter.zeroOutNullBytes();
/* 086 */ agg_rowWriter.write(0, agg_key_0);
/* 087 */ org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
/* 088 */ = agg_rowWriter.getRow();
/* 089 */ Object kbase = agg_result.getBaseObject();
/* 090 */ long koff = agg_result.getBaseOffset();
/* 091 */ int klen = agg_result.getSizeInBytes();
/* 092 */
/* 093 */ UnsafeRow vRow
/* 094 */ = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 095 */ if (vRow == null) {
/* 096 */ isBatchFull = true;
/* 097 */ } else {
/* 098 */ buckets[idx] = numRows++;
/* 099 */ }
/* 100 */ return vRow;
/* 101 */ } else {
/* 102 */ // No more space
/* 103 */ return null;
/* 104 */ }
/* 105 */ } else if (equals(idx, agg_key_0)) {
/* 106 */ return batch.getValueRow(buckets[idx]);
/* 107 */ }
/* 108 */ idx = (idx + 1) & (numBuckets - 1);
/* 109 */ step++;
/* 110 */ }
/* 111 */ // Didn't find it
/* 112 */ return null;
/* 113 */ }
/* 114 */
/* 115 */ private boolean equals(int idx, int agg_key_0) {
/* 116 */ UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 117 */ return (row.getInt(0) == agg_key_0);
/* 118 */ }
/* 119 */
/* 120 */ private long hash(int agg_key_0) {
/* 121 */ long agg_hash_0 = 0;
/* 122 */
/* 123 */ int agg_result_0 = agg_key_0;
/* 124 */ agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_0 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
/* 125 */
/* 126 */ return agg_hash_0;
/* 127 */ }
/* 128 */
/* 129 */ public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 130 */ return batch.rowIterator();
/* 131 */ }
/* 132 */
/* 133 */ public void close() {
/* 134 */ batch.close();
/* 135 */ }
/* 136 */
/* 137 */ }
/* 138 */
/* 139 */ private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
/* 140 */ throws java.io.IOException {
/* 141 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[9] /* numOutputRows */).add(1);
/* 142 */
/* 143 */ boolean agg_isNull_19 = agg_keyTerm_0.isNullAt(0);
/* 144 */ int agg_value_21 = agg_isNull_19 ?
/* 145 */ -1 : (agg_keyTerm_0.getInt(0));
/* 146 */ boolean agg_isNull_20 = agg_bufferTerm_0.isNullAt(0);
/* 147 */ double agg_value_22 = agg_isNull_20 ?
/* 148 */ -1.0 : (agg_bufferTerm_0.getDouble(0));
/* 149 */ boolean agg_isNull_21 = agg_bufferTerm_0.isNullAt(1);
/* 150 */ long agg_value_23 = agg_isNull_21 ?
/* 151 */ -1L : (agg_bufferTerm_0.getLong(1));
/* 152 */
/* 153 */ columnartorow_mutableStateArray_3[3].reset();
/* 154 */
/* 155 */ columnartorow_mutableStateArray_3[3].zeroOutNullBytes();
/* 156 */
/* 157 */ if (agg_isNull_19) {
/* 158 */ columnartorow_mutableStateArray_3[3].setNullAt(0);
/* 159 */ } else {
/* 160 */ columnartorow_mutableStateArray_3[3].write(0, agg_value_21);
/* 161 */ }
/* 162 */
/* 163 */ if (agg_isNull_20) {
/* 164 */ columnartorow_mutableStateArray_3[3].setNullAt(1);
/* 165 */ } else {
/* 166 */ columnartorow_mutableStateArray_3[3].write(1, agg_value_22);
/* 167 */ }
/* 168 */
/* 169 */ if (agg_isNull_21) {
/* 170 */ columnartorow_mutableStateArray_3[3].setNullAt(2);
/* 171 */ } else {
/* 172 */ columnartorow_mutableStateArray_3[3].write(2, agg_value_23);
/* 173 */ }
/* 174 */ append((columnartorow_mutableStateArray_3[3].getRow()));
/* 175 */
/* 176 */ }
/* 177 */
/* 178 */ private void columnartorow_nextBatch_0() throws java.io.IOException {
/* 179 */ if (columnartorow_mutableStateArray_0[0].hasNext()) {
/* 180 */ columnartorow_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
/* 181 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* numInputBatches */).add(1);
/* 182 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(columnartorow_mutableStateArray_1[0].numRows());
/* 183 */ columnartorow_batchIdx_0 = 0;
/* 184 */ columnartorow_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(0);
/* 185 */ columnartorow_mutableStateArray_2[1] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(1);
/* 186 */
/* 187 */ }
/* 188 */ }
/* 189 */
/* 190 */ private void agg_doConsume_0(int agg_expr_0_0, boolean agg_exprIsNull_0_0, int agg_expr_1_0, boolean agg_exprIsNull_1_0) throws java.io.IOException {
/* 191 */ UnsafeRow agg_unsafeRowAggBuffer_0 = null;
/* 192 */ UnsafeRow agg_fastAggBuffer_0 = null;
/* 193 */
/* 194 */ if (!agg_exprIsNull_0_0) {
/* 195 */ agg_fastAggBuffer_0 = agg_fastHashMap_0.findOrInsert(
/* 196 */ agg_expr_0_0);
/* 197 */ }
/* 198 */ // Cannot find the key in fast hash map, try regular hash map.
/* 199 */ if (agg_fastAggBuffer_0 == null) {
/* 200 */ // generate grouping key
/* 201 */ columnartorow_mutableStateArray_3[2].reset();
/* 202 */
/* 203 */ columnartorow_mutableStateArray_3[2].zeroOutNullBytes();
/* 204 */
/* 205 */ if (agg_exprIsNull_0_0) {
/* 206 */ columnartorow_mutableStateArray_3[2].setNullAt(0);
/* 207 */ } else {
/* 208 */ columnartorow_mutableStateArray_3[2].write(0, agg_expr_0_0);
/* 209 */ }
/* 210 */ int agg_unsafeRowKeyHash_0 = (columnartorow_mutableStateArray_3[2].getRow()).hashCode();
/* 211 */ if (true) {
/* 212 */ // try to get the buffer from hash map
/* 213 */ agg_unsafeRowAggBuffer_0 =
/* 214 */ agg_hashMap_0.getAggregationBufferFromUnsafeRow((columnartorow_mutableStateArray_3[2].getRow()), agg_unsafeRowKeyHash_0);
/* 215 */ }
/* 216 */ // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 217 */ // aggregation after processing all input rows.
/* 218 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 219 */ if (agg_sorter_0 == null) {
/* 220 */ agg_sorter_0 = agg_hashMap_0.destructAndCreateExternalSorter();
/* 221 */ } else {
/* 222 */ agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter());
/* 223 */ }
/* 224 */
/* 225 */ // the hash map had be spilled, it should have enough memory now,
/* 226 */ // try to allocate buffer again.
/* 227 */ agg_unsafeRowAggBuffer_0 = agg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 228 */ (columnartorow_mutableStateArray_3[2].getRow()), agg_unsafeRowKeyHash_0);
/* 229 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 230 */ // failed to allocate the first page
/* 231 */ throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 232 */ }
/* 233 */ }
/* 234 */
/* 235 */ }
/* 236 */
/* 237 */ // Updates the proper row buffer
/* 238 */ if (agg_fastAggBuffer_0 != null) {
/* 239 */ agg_unsafeRowAggBuffer_0 = agg_fastAggBuffer_0;
/* 240 */ }
/* 241 */
/* 242 */ // common sub-expressions
/* 243 */
/* 244 */ // evaluate aggregate functions and update aggregation buffers
/* 245 */ agg_doAggregate_avg_0(agg_unsafeRowAggBuffer_0, agg_exprIsNull_1_0, agg_expr_1_0);
/* 246 */
/* 247 */ }
/* 248 */
/* 249 */ private void agg_doAggregate_avg_0(org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_0, boolean agg_exprIsNull_1_0, int agg_expr_1_0) throws java.io.IOException {
/* 250 */ boolean agg_isNull_6 = true;
/* 251 */ double agg_value_8 = -1.0;
/* 252 */ boolean agg_isNull_7 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 253 */ double agg_value_9 = agg_isNull_7 ?
/* 254 */ -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 255 */ if (!agg_isNull_7) {
/* 256 */ agg_agg_isNull_8_0 = true;
/* 257 */ double agg_value_10 = -1.0;
/* 258 */ do {
/* 259 */ boolean agg_isNull_9 = agg_exprIsNull_1_0;
/* 260 */ double agg_value_11 = -1.0;
/* 261 */ if (!agg_exprIsNull_1_0) {
/* 262 */ agg_value_11 = (double) agg_expr_1_0;
/* 263 */ }
/* 264 */ if (!agg_isNull_9) {
/* 265 */ agg_agg_isNull_8_0 = false;
/* 266 */ agg_value_10 = agg_value_11;
/* 267 */ continue;
/* 268 */ }
/* 269 */
/* 270 */ if (!false) {
/* 271 */ agg_agg_isNull_8_0 = false;
/* 272 */ agg_value_10 = 0.0D;
/* 273 */ continue;
/* 274 */ }
/* 275 */
/* 276 */ } while (false);
/* 277 */
/* 278 */ agg_isNull_6 = false; // resultCode could change nullability.
/* 279 */
/* 280 */ agg_value_8 = agg_value_9 + agg_value_10;
/* 281 */
/* 282 */ }
/* 283 */ boolean agg_isNull_12 = false;
/* 284 */ long agg_value_14 = -1L;
/* 285 */ if (!false && agg_exprIsNull_1_0) {
/* 286 */ boolean agg_isNull_15 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 287 */ long agg_value_17 = agg_isNull_15 ?
/* 288 */ -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 289 */ agg_isNull_12 = agg_isNull_15;
/* 290 */ agg_value_14 = agg_value_17;
/* 291 */ } else {
/* 292 */ boolean agg_isNull_16 = true;
/* 293 */ long agg_value_18 = -1L;
/* 294 */ boolean agg_isNull_17 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 295 */ long agg_value_19 = agg_isNull_17 ?
/* 296 */ -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 297 */ if (!agg_isNull_17) {
/* 298 */ agg_isNull_16 = false; // resultCode could change nullability.
/* 299 */
/* 300 */ agg_value_18 = agg_value_19 + 1L;
/* 301 */
/* 302 */ }
/* 303 */ agg_isNull_12 = agg_isNull_16;
/* 304 */ agg_value_14 = agg_value_18;
/* 305 */ }
/* 306 */
/* 307 */ if (!agg_isNull_6) {
/* 308 */ agg_unsafeRowAggBuffer_0.setDouble(0, agg_value_8);
/* 309 */ } else {
/* 310 */ agg_unsafeRowAggBuffer_0.setNullAt(0);
/* 311 */ }
/* 312 */
/* 313 */ if (!agg_isNull_12) {
/* 314 */ agg_unsafeRowAggBuffer_0.setLong(1, agg_value_14);
/* 315 */ } else {
/* 316 */ agg_unsafeRowAggBuffer_0.setNullAt(1);
/* 317 */ }
/* 318 */ }
/* 319 */
/* 320 */ private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 321 */ if (columnartorow_mutableStateArray_1[0] == null) {
/* 322 */ columnartorow_nextBatch_0();
/* 323 */ }
/* 324 */ while ( columnartorow_mutableStateArray_1[0] != null) {
/* 325 */ int columnartorow_numRows_0 = columnartorow_mutableStateArray_1[0].numRows();
/* 326 */ int columnartorow_localEnd_0 = columnartorow_numRows_0 - columnartorow_batchIdx_0;
/* 327 */ for (int columnartorow_localIdx_0 = 0; columnartorow_localIdx_0 < columnartorow_localEnd_0; columnartorow_localIdx_0++) {
/* 328 */ int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 + columnartorow_localIdx_0;
/* 329 */ boolean columnartorow_isNull_0 = columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
/* 330 */ int columnartorow_value_0 = columnartorow_isNull_0 ? -1 : (columnartorow_mutableStateArray_2[0].getInt(columnartorow_rowIdx_0));
/* 331 */ boolean columnartorow_isNull_1 = columnartorow_mutableStateArray_2[1].isNullAt(columnartorow_rowIdx_0);
/* 332 */ int columnartorow_value_1 = columnartorow_isNull_1 ? -1 : (columnartorow_mutableStateArray_2[1].getInt(columnartorow_rowIdx_0));
/* 333 */
/* 334 */ agg_doConsume_0(columnartorow_value_0, columnartorow_isNull_0, columnartorow_value_1, columnartorow_isNull_1);
/* 335 */ // shouldStop check is eliminated
/* 336 */ }
/* 337 */ columnartorow_batchIdx_0 = columnartorow_numRows_0;
/* 338 */ columnartorow_mutableStateArray_1[0] = null;
/* 339 */ columnartorow_nextBatch_0();
/* 340 */ }
/* 341 */
/* 342 */ agg_fastHashMapIter_0 = agg_fastHashMap_0.rowIterator();
/* 343 */ agg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* avgHashProbe */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* numTasksFallBacked */));
/* 344 */
/* 345 */ }
/* 346 */
/* 347 */ protected void processNext() throws java.io.IOException {
/* 348 */ if (!agg_initAgg_0) {
/* 349 */ agg_initAgg_0 = true;
/* 350 */ agg_fastHashMap_0 = new agg_FastHashMap_0(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().taskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 351 */
/* 352 */ ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().addTaskCompletionListener(
/* 353 */ new org.apache.spark.util.TaskCompletionListener() {
/* 354 */ @Override
/* 355 */ public void onTaskCompletion(org.apache.spark.TaskContext context) {
/* 356 */ agg_fastHashMap_0.close();
/* 357 */ }
/* 358 */ });
/* 359 */
/* 360 */ agg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 361 */ long wholestagecodegen_beforeAgg_0 = System.nanoTime();
/* 362 */ agg_doAggregateWithKeys_0();
/* 363 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[10] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg_0) / 1000000);
/* 364 */ }
/* 365 */ // output the result
/* 366 */
/* 367 */ while ( agg_fastHashMapIter_0.next()) {
/* 368 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_fastHashMapIter_0.getKey();
/* 369 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_fastHashMapIter_0.getValue();
/* 370 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 371 */
/* 372 */ if (shouldStop()) return;
/* 373 */ }
/* 374 */ agg_fastHashMap_0.close();
/* 375 */
/* 376 */ while ( agg_mapIter_0.next()) {
/* 377 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_mapIter_0.getKey();
/* 378 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_mapIter_0.getValue();
/* 379 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 380 */ if (shouldStop()) return;
/* 381 */ }
/* 382 */ agg_mapIter_0.close();
/* 383 */ if (agg_sorter_0 == null) {
/* 384 */ agg_hashMap_0.free();
/* 385 */ }
/* 386 */ }
/* 387 */
/* 388 */ }
== Subtree 2 / 2 (maxMethodCodeSize:248; maxConstantPoolSize:279(0.43% used); numInnerClasses:2) ==
*(2) HashAggregate(keys=[key#57], functions=[avg(value#58)], output=[key#57, avg(value)#60])
+- Exchange hashpartitioning(key#57, 5), ENSURE_REQUIREMENTS, [id=#65]
+- *(1) HashAggregate(keys=[key#57], functions=[partial_avg(value#58)], output=[key#57, sum#65, count#66L])
+- *(1) ColumnarToRow
+- FileScan parquet default.agg1[key#57,value#58] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:int>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg_0;
/* 010 */ private boolean agg_bufIsNull_0;
/* 011 */ private double agg_bufValue_0;
/* 012 */ private boolean agg_bufIsNull_1;
/* 013 */ private long agg_bufValue_1;
/* 014 */ private agg_FastHashMap_0 agg_fastHashMap_0;
/* 015 */ private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter_0;
/* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
/* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
/* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
/* 019 */ private scala.collection.Iterator inputadapter_input_0;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 021 */
/* 022 */ public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 023 */ this.references = references;
/* 024 */ }
/* 025 */
/* 026 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 027 */ partitionIndex = index;
/* 028 */ this.inputs = inputs;
/* 029 */
/* 030 */ inputadapter_input_0 = inputs[0];
/* 031 */ agg_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 032 */ agg_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 033 */
/* 034 */ }
/* 035 */
/* 036 */ public class agg_FastHashMap_0 {
/* 037 */ private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 038 */ private int[] buckets;
/* 039 */ private int capacity = 1 << 16;
/* 040 */ private double loadFactor = 0.5;
/* 041 */ private int numBuckets = (int) (capacity / loadFactor);
/* 042 */ private int maxSteps = 2;
/* 043 */ private int numRows = 0;
/* 044 */ private Object emptyVBase;
/* 045 */ private long emptyVOff;
/* 046 */ private int emptyVLen;
/* 047 */ private boolean isBatchFull = false;
/* 048 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 049 */
/* 050 */ public agg_FastHashMap_0(
/* 051 */ org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 052 */ InternalRow emptyAggregationBuffer) {
/* 053 */ batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 054 */ .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
/* 055 */
/* 056 */ final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
/* 057 */ final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 058 */
/* 059 */ emptyVBase = emptyBuffer;
/* 060 */ emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 061 */ emptyVLen = emptyBuffer.length;
/* 062 */
/* 063 */ agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 064 */ 1, 0);
/* 065 */
/* 066 */ buckets = new int[numBuckets];
/* 067 */ java.util.Arrays.fill(buckets, -1);
/* 068 */ }
/* 069 */
/* 070 */ public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(int agg_key_0) {
/* 071 */ long h = hash(agg_key_0);
/* 072 */ int step = 0;
/* 073 */ int idx = (int) h & (numBuckets - 1);
/* 074 */ while (step < maxSteps) {
/* 075 */ // Return bucket index if it's either an empty slot or already contains the key
/* 076 */ if (buckets[idx] == -1) {
/* 077 */ if (numRows < capacity && !isBatchFull) {
/* 078 */ agg_rowWriter.reset();
/* 079 */ agg_rowWriter.zeroOutNullBytes();
/* 080 */ agg_rowWriter.write(0, agg_key_0);
/* 081 */ org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
/* 082 */ = agg_rowWriter.getRow();
/* 083 */ Object kbase = agg_result.getBaseObject();
/* 084 */ long koff = agg_result.getBaseOffset();
/* 085 */ int klen = agg_result.getSizeInBytes();
/* 086 */
/* 087 */ UnsafeRow vRow
/* 088 */ = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 089 */ if (vRow == null) {
/* 090 */ isBatchFull = true;
/* 091 */ } else {
/* 092 */ buckets[idx] = numRows++;
/* 093 */ }
/* 094 */ return vRow;
/* 095 */ } else {
/* 096 */ // No more space
/* 097 */ return null;
/* 098 */ }
/* 099 */ } else if (equals(idx, agg_key_0)) {
/* 100 */ return batch.getValueRow(buckets[idx]);
/* 101 */ }
/* 102 */ idx = (idx + 1) & (numBuckets - 1);
/* 103 */ step++;
/* 104 */ }
/* 105 */ // Didn't find it
/* 106 */ return null;
/* 107 */ }
/* 108 */
/* 109 */ private boolean equals(int idx, int agg_key_0) {
/* 110 */ UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 111 */ return (row.getInt(0) == agg_key_0);
/* 112 */ }
/* 113 */
/* 114 */ private long hash(int agg_key_0) {
/* 115 */ long agg_hash_0 = 0;
/* 116 */
/* 117 */ int agg_result_0 = agg_key_0;
/* 118 */ agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_0 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
/* 119 */
/* 120 */ return agg_hash_0;
/* 121 */ }
/* 122 */
/* 123 */ public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 124 */ return batch.rowIterator();
/* 125 */ }
/* 126 */
/* 127 */ public void close() {
/* 128 */ batch.close();
/* 129 */ }
/* 130 */
/* 131 */ }
/* 132 */
/* 133 */ private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
/* 134 */ throws java.io.IOException {
/* 135 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(1);
/* 136 */
/* 137 */ boolean agg_isNull_10 = agg_keyTerm_0.isNullAt(0);
/* 138 */ int agg_value_12 = agg_isNull_10 ?
/* 139 */ -1 : (agg_keyTerm_0.getInt(0));
/* 140 */ boolean agg_isNull_11 = agg_bufferTerm_0.isNullAt(0);
/* 141 */ double agg_value_13 = agg_isNull_11 ?
/* 142 */ -1.0 : (agg_bufferTerm_0.getDouble(0));
/* 143 */ boolean agg_isNull_12 = agg_bufferTerm_0.isNullAt(1);
/* 144 */ long agg_value_14 = agg_isNull_12 ?
/* 145 */ -1L : (agg_bufferTerm_0.getLong(1));
/* 146 */ boolean agg_isNull_15 = agg_isNull_12;
/* 147 */ double agg_value_17 = -1.0;
/* 148 */ if (!agg_isNull_12) {
/* 149 */ agg_value_17 = (double) agg_value_14;
/* 150 */ }
/* 151 */ boolean agg_isNull_13 = false;
/* 152 */ double agg_value_15 = -1.0;
/* 153 */ if (agg_isNull_15 || agg_value_17 == 0) {
/* 154 */ agg_isNull_13 = true;
/* 155 */ } else {
/* 156 */ if (agg_isNull_11) {
/* 157 */ agg_isNull_13 = true;
/* 158 */ } else {
/* 159 */ agg_value_15 = (double)(agg_value_13 / agg_value_17);
/* 160 */ }
/* 161 */ }
/* 162 */
/* 163 */ agg_mutableStateArray_0[1].reset();
/* 164 */
/* 165 */ agg_mutableStateArray_0[1].zeroOutNullBytes();
/* 166 */
/* 167 */ if (agg_isNull_10) {
/* 168 */ agg_mutableStateArray_0[1].setNullAt(0);
/* 169 */ } else {
/* 170 */ agg_mutableStateArray_0[1].write(0, agg_value_12);
/* 171 */ }
/* 172 */
/* 173 */ if (agg_isNull_13) {
/* 174 */ agg_mutableStateArray_0[1].setNullAt(1);
/* 175 */ } else {
/* 176 */ agg_mutableStateArray_0[1].write(1, agg_value_15);
/* 177 */ }
/* 178 */ append((agg_mutableStateArray_0[1].getRow()));
/* 179 */
/* 180 */ }
/* 181 */
/* 182 */ private void agg_doConsume_0(InternalRow inputadapter_row_0, int agg_expr_0_0, boolean agg_exprIsNull_0_0, double agg_expr_1_0, boolean agg_exprIsNull_1_0, long agg_expr_2_0, boolean agg_exprIsNull_2_0) throws java.io.IOException {
/* 183 */ UnsafeRow agg_unsafeRowAggBuffer_0 = null;
/* 184 */ UnsafeRow agg_fastAggBuffer_0 = null;
/* 185 */
/* 186 */ if (!agg_exprIsNull_0_0) {
/* 187 */ agg_fastAggBuffer_0 = agg_fastHashMap_0.findOrInsert(
/* 188 */ agg_expr_0_0);
/* 189 */ }
/* 190 */ // Cannot find the key in fast hash map, try regular hash map.
/* 191 */ if (agg_fastAggBuffer_0 == null) {
/* 192 */ // generate grouping key
/* 193 */ agg_mutableStateArray_0[0].reset();
/* 194 */
/* 195 */ agg_mutableStateArray_0[0].zeroOutNullBytes();
/* 196 */
/* 197 */ if (agg_exprIsNull_0_0) {
/* 198 */ agg_mutableStateArray_0[0].setNullAt(0);
/* 199 */ } else {
/* 200 */ agg_mutableStateArray_0[0].write(0, agg_expr_0_0);
/* 201 */ }
/* 202 */ int agg_unsafeRowKeyHash_0 = (agg_mutableStateArray_0[0].getRow()).hashCode();
/* 203 */ if (true) {
/* 204 */ // try to get the buffer from hash map
/* 205 */ agg_unsafeRowAggBuffer_0 =
/* 206 */ agg_hashMap_0.getAggregationBufferFromUnsafeRow((agg_mutableStateArray_0[0].getRow()), agg_unsafeRowKeyHash_0);
/* 207 */ }
/* 208 */ // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 209 */ // aggregation after processing all input rows.
/* 210 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 211 */ if (agg_sorter_0 == null) {
/* 212 */ agg_sorter_0 = agg_hashMap_0.destructAndCreateExternalSorter();
/* 213 */ } else {
/* 214 */ agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter());
/* 215 */ }
/* 216 */
/* 217 */ // the hash map had be spilled, it should have enough memory now,
/* 218 */ // try to allocate buffer again.
/* 219 */ agg_unsafeRowAggBuffer_0 = agg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 220 */ (agg_mutableStateArray_0[0].getRow()), agg_unsafeRowKeyHash_0);
/* 221 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 222 */ // failed to allocate the first page
/* 223 */ throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 224 */ }
/* 225 */ }
/* 226 */
/* 227 */ }
/* 228 */
/* 229 */ // Updates the proper row buffer
/* 230 */ if (agg_fastAggBuffer_0 != null) {
/* 231 */ agg_unsafeRowAggBuffer_0 = agg_fastAggBuffer_0;
/* 232 */ }
/* 233 */
/* 234 */ // common sub-expressions
/* 235 */
/* 236 */ // evaluate aggregate functions and update aggregation buffers
/* 237 */ agg_doAggregate_avg_0(agg_exprIsNull_2_0, agg_expr_1_0, agg_exprIsNull_1_0, agg_unsafeRowAggBuffer_0, agg_expr_2_0);
/* 238 */
/* 239 */ }
/* 240 */
/* 241 */ private void agg_doAggregate_avg_0(boolean agg_exprIsNull_2_0, double agg_expr_1_0, boolean agg_exprIsNull_1_0, org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_0, long agg_expr_2_0) throws java.io.IOException {
/* 242 */ boolean agg_isNull_4 = true;
/* 243 */ double agg_value_6 = -1.0;
/* 244 */ boolean agg_isNull_5 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 245 */ double agg_value_7 = agg_isNull_5 ?
/* 246 */ -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 247 */ if (!agg_isNull_5) {
/* 248 */ if (!agg_exprIsNull_1_0) {
/* 249 */ agg_isNull_4 = false; // resultCode could change nullability.
/* 250 */
/* 251 */ agg_value_6 = agg_value_7 + agg_expr_1_0;
/* 252 */
/* 253 */ }
/* 254 */
/* 255 */ }
/* 256 */ boolean agg_isNull_7 = true;
/* 257 */ long agg_value_9 = -1L;
/* 258 */ boolean agg_isNull_8 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 259 */ long agg_value_10 = agg_isNull_8 ?
/* 260 */ -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 261 */ if (!agg_isNull_8) {
/* 262 */ if (!agg_exprIsNull_2_0) {
/* 263 */ agg_isNull_7 = false; // resultCode could change nullability.
/* 264 */
/* 265 */ agg_value_9 = agg_value_10 + agg_expr_2_0;
/* 266 */
/* 267 */ }
/* 268 */
/* 269 */ }
/* 270 */
/* 271 */ if (!agg_isNull_4) {
/* 272 */ agg_unsafeRowAggBuffer_0.setDouble(0, agg_value_6);
/* 273 */ } else {
/* 274 */ agg_unsafeRowAggBuffer_0.setNullAt(0);
/* 275 */ }
/* 276 */
/* 277 */ if (!agg_isNull_7) {
/* 278 */ agg_unsafeRowAggBuffer_0.setLong(1, agg_value_9);
/* 279 */ } else {
/* 280 */ agg_unsafeRowAggBuffer_0.setNullAt(1);
/* 281 */ }
/* 282 */ }
/* 283 */
/* 284 */ private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 285 */ while ( inputadapter_input_0.hasNext()) {
/* 286 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 287 */
/* 288 */ boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
/* 289 */ int inputadapter_value_0 = inputadapter_isNull_0 ?
/* 290 */ -1 : (inputadapter_row_0.getInt(0));
/* 291 */ boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
/* 292 */ double inputadapter_value_1 = inputadapter_isNull_1 ?
/* 293 */ -1.0 : (inputadapter_row_0.getDouble(1));
/* 294 */ boolean inputadapter_isNull_2 = inputadapter_row_0.isNullAt(2);
/* 295 */ long inputadapter_value_2 = inputadapter_isNull_2 ?
/* 296 */ -1L : (inputadapter_row_0.getLong(2));
/* 297 */
/* 298 */ agg_doConsume_0(inputadapter_row_0, inputadapter_value_0, inputadapter_isNull_0, inputadapter_value_1, inputadapter_isNull_1, inputadapter_value_2, inputadapter_isNull_2);
/* 299 */ // shouldStop check is eliminated
/* 300 */ }
/* 301 */
/* 302 */ agg_fastHashMapIter_0 = agg_fastHashMap_0.rowIterator();
/* 303 */ agg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* avgHashProbe */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* numTasksFallBacked */));
/* 304 */
/* 305 */ }
/* 306 */
/* 307 */ protected void processNext() throws java.io.IOException {
/* 308 */ if (!agg_initAgg_0) {
/* 309 */ agg_initAgg_0 = true;
/* 310 */ agg_fastHashMap_0 = new agg_FastHashMap_0(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().taskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 311 */
/* 312 */ ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().addTaskCompletionListener(
/* 313 */ new org.apache.spark.util.TaskCompletionListener() {
/* 314 */ @Override
/* 315 */ public void onTaskCompletion(org.apache.spark.TaskContext context) {
/* 316 */ agg_fastHashMap_0.close();
/* 317 */ }
/* 318 */ });
/* 319 */
/* 320 */ agg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 321 */ long wholestagecodegen_beforeAgg_0 = System.nanoTime();
/* 322 */ agg_doAggregateWithKeys_0();
/* 323 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg_0) / 1000000);
/* 324 */ }
/* 325 */ // output the result
/* 326 */
/* 327 */ while ( agg_fastHashMapIter_0.next()) {
/* 328 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_fastHashMapIter_0.getKey();
/* 329 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_fastHashMapIter_0.getValue();
/* 330 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 331 */
/* 332 */ if (shouldStop()) return;
/* 333 */ }
/* 334 */ agg_fastHashMap_0.close();
/* 335 */
/* 336 */ while ( agg_mapIter_0.next()) {
/* 337 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_mapIter_0.getKey();
/* 338 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_mapIter_0.getValue();
/* 339 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 340 */ if (shouldStop()) return;
/* 341 */ }
/* 342 */ agg_mapIter_0.close();
/* 343 */ if (agg_sorter_0 == null) {
/* 344 */ agg_hashMap_0.free();
/* 345 */ }
/* 346 */ }
/* 347 */
/* 348 */ }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment