Skip to content

Instantly share code, notes, and snippets.

@c21
Created October 13, 2021 08:06
Show Gist options
  • Save c21/d4ce87ef28a22d1ce839e0cda000ce14 to your computer and use it in GitHub Desktop.
Save c21/d4ce87ef28a22d1ce839e0cda000ce14 to your computer and use it in GitHub Desktop.
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:266; maxConstantPoolSize:309(0.47% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#57], functions=[partial_avg(value#58)], output=[key#57, sum#65, count#66L])
+- *(1) ColumnarToRow
+- FileScan parquet default.agg1[key#57,value#58] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:int>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg_0;
/* 010 */ private boolean agg_bufIsNull_0;
/* 011 */ private double agg_bufValue_0;
/* 012 */ private boolean agg_bufIsNull_1;
/* 013 */ private long agg_bufValue_1;
/* 014 */ private agg_FastHashMap_0 agg_fastHashMap_0;
/* 015 */ private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter_0;
/* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
/* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
/* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
/* 019 */ private int columnartorow_batchIdx_0;
/* 020 */ private boolean agg_agg_isNull_8_0;
/* 021 */ private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] columnartorow_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[2];
/* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] columnartorow_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 023 */ private org.apache.spark.sql.vectorized.ColumnarBatch[] columnartorow_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1];
/* 024 */ private scala.collection.Iterator[] columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
/* 025 */
/* 026 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 027 */ this.references = references;
/* 028 */ }
/* 029 */
/* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */
/* 034 */ columnartorow_mutableStateArray_0[0] = inputs[0];
/* 035 */ columnartorow_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 036 */ columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 037 */ columnartorow_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 038 */ columnartorow_mutableStateArray_3[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 039 */
/* 040 */ }
/* 041 */
/* 042 */ public class agg_FastHashMap_0 {
/* 043 */ private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 044 */ private int[] buckets;
/* 045 */ private int capacity = 1 << 16;
/* 046 */ private double loadFactor = 0.5;
/* 047 */ private int numBuckets = (int) (capacity / loadFactor);
/* 048 */ private int maxSteps = 2;
/* 049 */ private int numRows = 0;
/* 050 */ private Object emptyVBase;
/* 051 */ private long emptyVOff;
/* 052 */ private int emptyVLen;
/* 053 */ private boolean isBatchFull = false;
/* 054 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 055 */
/* 056 */ public agg_FastHashMap_0(
/* 057 */ org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 058 */ InternalRow emptyAggregationBuffer) {
/* 059 */ batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 060 */ .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
/* 061 */
/* 062 */ final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
/* 063 */ final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 064 */
/* 065 */ emptyVBase = emptyBuffer;
/* 066 */ emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 067 */ emptyVLen = emptyBuffer.length;
/* 068 */
/* 069 */ agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 070 */ 1, 0);
/* 071 */
/* 072 */ buckets = new int[numBuckets];
/* 073 */ java.util.Arrays.fill(buckets, -1);
/* 074 */ }
/* 075 */
/* 076 */ public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(int agg_key_0) {
/* 077 */ long h = hash(agg_key_0);
/* 078 */ int step = 0;
/* 079 */ int idx = (int) h & (numBuckets - 1);
/* 080 */ while (step < maxSteps) {
/* 081 */ // Return bucket index if it's either an empty slot or already contains the key
/* 082 */ if (buckets[idx] == -1) {
/* 083 */ if (numRows < capacity && !isBatchFull) {
/* 084 */ agg_rowWriter.reset();
/* 085 */ agg_rowWriter.zeroOutNullBytes();
/* 086 */ agg_rowWriter.write(0, agg_key_0);
/* 087 */ org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
/* 088 */ = agg_rowWriter.getRow();
/* 089 */ Object kbase = agg_result.getBaseObject();
/* 090 */ long koff = agg_result.getBaseOffset();
/* 091 */ int klen = agg_result.getSizeInBytes();
/* 092 */
/* 093 */ UnsafeRow vRow
/* 094 */ = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 095 */ if (vRow == null) {
/* 096 */ isBatchFull = true;
/* 097 */ } else {
/* 098 */ buckets[idx] = numRows++;
/* 099 */ }
/* 100 */ return vRow;
/* 101 */ } else {
/* 102 */ // No more space
/* 103 */ return null;
/* 104 */ }
/* 105 */ } else if (equals(idx, agg_key_0)) {
/* 106 */ return batch.getValueRow(buckets[idx]);
/* 107 */ }
/* 108 */ idx = (idx + 1) & (numBuckets - 1);
/* 109 */ step++;
/* 110 */ }
/* 111 */ // Didn't find it
/* 112 */ return null;
/* 113 */ }
/* 114 */
/* 115 */ private boolean equals(int idx, int agg_key_0) {
/* 116 */ UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 117 */ return (row.getInt(0) == agg_key_0);
/* 118 */ }
/* 119 */
/* 120 */ private long hash(int agg_key_0) {
/* 121 */ long agg_hash_0 = 0;
/* 122 */
/* 123 */ int agg_result_0 = agg_key_0;
/* 124 */ agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_0 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
/* 125 */
/* 126 */ return agg_hash_0;
/* 127 */ }
/* 128 */
/* 129 */ public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 130 */ return batch.rowIterator();
/* 131 */ }
/* 132 */
/* 133 */ public void close() {
/* 134 */ batch.close();
/* 135 */ }
/* 136 */
/* 137 */ }
/* 138 */
/* 139 */ private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
/* 140 */ throws java.io.IOException {
/* 141 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[9] /* numOutputRows */).add(1);
/* 142 */
/* 143 */ boolean agg_isNull_19 = agg_keyTerm_0.isNullAt(0);
/* 144 */ int agg_value_21 = agg_isNull_19 ?
/* 145 */ -1 : (agg_keyTerm_0.getInt(0));
/* 146 */ boolean agg_isNull_20 = agg_bufferTerm_0.isNullAt(0);
/* 147 */ double agg_value_22 = agg_isNull_20 ?
/* 148 */ -1.0 : (agg_bufferTerm_0.getDouble(0));
/* 149 */ boolean agg_isNull_21 = agg_bufferTerm_0.isNullAt(1);
/* 150 */ long agg_value_23 = agg_isNull_21 ?
/* 151 */ -1L : (agg_bufferTerm_0.getLong(1));
/* 152 */
/* 153 */ columnartorow_mutableStateArray_3[3].reset();
/* 154 */
/* 155 */ columnartorow_mutableStateArray_3[3].zeroOutNullBytes();
/* 156 */
/* 157 */ if (agg_isNull_19) {
/* 158 */ columnartorow_mutableStateArray_3[3].setNullAt(0);
/* 159 */ } else {
/* 160 */ columnartorow_mutableStateArray_3[3].write(0, agg_value_21);
/* 161 */ }
/* 162 */
/* 163 */ if (agg_isNull_20) {
/* 164 */ columnartorow_mutableStateArray_3[3].setNullAt(1);
/* 165 */ } else {
/* 166 */ columnartorow_mutableStateArray_3[3].write(1, agg_value_22);
/* 167 */ }
/* 168 */
/* 169 */ if (agg_isNull_21) {
/* 170 */ columnartorow_mutableStateArray_3[3].setNullAt(2);
/* 171 */ } else {
/* 172 */ columnartorow_mutableStateArray_3[3].write(2, agg_value_23);
/* 173 */ }
/* 174 */ append((columnartorow_mutableStateArray_3[3].getRow()));
/* 175 */
/* 176 */ }
/* 177 */
/* 178 */ private void columnartorow_nextBatch_0() throws java.io.IOException {
/* 179 */ if (columnartorow_mutableStateArray_0[0].hasNext()) {
/* 180 */ columnartorow_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
/* 181 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* numInputBatches */).add(1);
/* 182 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(columnartorow_mutableStateArray_1[0].numRows());
/* 183 */ columnartorow_batchIdx_0 = 0;
/* 184 */ columnartorow_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(0);
/* 185 */ columnartorow_mutableStateArray_2[1] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(1);
/* 186 */
/* 187 */ }
/* 188 */ }
/* 189 */
/* 190 */ private void agg_doConsume_0(int agg_expr_0_0, boolean agg_exprIsNull_0_0, int agg_expr_1_0, boolean agg_exprIsNull_1_0) throws java.io.IOException {
/* 191 */ UnsafeRow agg_unsafeRowAggBuffer_0 = null;
/* 192 */ UnsafeRow agg_fastAggBuffer_0 = null;
/* 193 */
/* 194 */ if (!agg_exprIsNull_0_0) {
/* 195 */ agg_fastAggBuffer_0 = agg_fastHashMap_0.findOrInsert(
/* 196 */ agg_expr_0_0);
/* 197 */ }
/* 198 */ // Cannot find the key in fast hash map, try regular hash map.
/* 199 */ if (agg_fastAggBuffer_0 == null) {
/* 200 */ // generate grouping key
/* 201 */ columnartorow_mutableStateArray_3[2].reset();
/* 202 */
/* 203 */ columnartorow_mutableStateArray_3[2].zeroOutNullBytes();
/* 204 */
/* 205 */ if (agg_exprIsNull_0_0) {
/* 206 */ columnartorow_mutableStateArray_3[2].setNullAt(0);
/* 207 */ } else {
/* 208 */ columnartorow_mutableStateArray_3[2].write(0, agg_expr_0_0);
/* 209 */ }
/* 210 */ int agg_unsafeRowKeyHash_0 = (columnartorow_mutableStateArray_3[2].getRow()).hashCode();
/* 211 */ if (true) {
/* 212 */ // try to get the buffer from hash map
/* 213 */ agg_unsafeRowAggBuffer_0 =
/* 214 */ agg_hashMap_0.getAggregationBufferFromUnsafeRow((columnartorow_mutableStateArray_3[2].getRow()), agg_unsafeRowKeyHash_0);
/* 215 */ }
/* 216 */ // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 217 */ // aggregation after processing all input rows.
/* 218 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 219 */ if (agg_sorter_0 == null) {
/* 220 */ agg_sorter_0 = agg_hashMap_0.destructAndCreateExternalSorter();
/* 221 */ } else {
/* 222 */ agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter());
/* 223 */ }
/* 224 */
/* 225 */ // the hash map had be spilled, it should have enough memory now,
/* 226 */ // try to allocate buffer again.
/* 227 */ agg_unsafeRowAggBuffer_0 = agg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 228 */ (columnartorow_mutableStateArray_3[2].getRow()), agg_unsafeRowKeyHash_0);
/* 229 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 230 */ // failed to allocate the first page
/* 231 */ throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 232 */ }
/* 233 */ }
/* 234 */
/* 235 */ }
/* 236 */
/* 237 */ // Updates the proper row buffer
/* 238 */ if (agg_fastAggBuffer_0 != null) {
/* 239 */ agg_unsafeRowAggBuffer_0 = agg_fastAggBuffer_0;
/* 240 */ }
/* 241 */
/* 242 */ // common sub-expressions
/* 243 */
/* 244 */ // evaluate aggregate functions and update aggregation buffers
/* 245 */ agg_doAggregate_avg_0(agg_unsafeRowAggBuffer_0, agg_exprIsNull_1_0, agg_expr_1_0);
/* 246 */
/* 247 */ }
/* 248 */
/* 249 */ private void agg_doAggregate_avg_0(org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_0, boolean agg_exprIsNull_1_0, int agg_expr_1_0) throws java.io.IOException {
/* 250 */ boolean agg_isNull_6 = true;
/* 251 */ double agg_value_8 = -1.0;
/* 252 */ boolean agg_isNull_7 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 253 */ double agg_value_9 = agg_isNull_7 ?
/* 254 */ -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 255 */ if (!agg_isNull_7) {
/* 256 */ agg_agg_isNull_8_0 = true;
/* 257 */ double agg_value_10 = -1.0;
/* 258 */ do {
/* 259 */ boolean agg_isNull_9 = agg_exprIsNull_1_0;
/* 260 */ double agg_value_11 = -1.0;
/* 261 */ if (!agg_exprIsNull_1_0) {
/* 262 */ agg_value_11 = (double) agg_expr_1_0;
/* 263 */ }
/* 264 */ if (!agg_isNull_9) {
/* 265 */ agg_agg_isNull_8_0 = false;
/* 266 */ agg_value_10 = agg_value_11;
/* 267 */ continue;
/* 268 */ }
/* 269 */
/* 270 */ if (!false) {
/* 271 */ agg_agg_isNull_8_0 = false;
/* 272 */ agg_value_10 = 0.0D;
/* 273 */ continue;
/* 274 */ }
/* 275 */
/* 276 */ } while (false);
/* 277 */
/* 278 */ agg_isNull_6 = false; // resultCode could change nullability.
/* 279 */
/* 280 */ agg_value_8 = agg_value_9 + agg_value_10;
/* 281 */
/* 282 */ }
/* 283 */ boolean agg_isNull_12 = false;
/* 284 */ long agg_value_14 = -1L;
/* 285 */ if (!false && agg_exprIsNull_1_0) {
/* 286 */ boolean agg_isNull_15 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 287 */ long agg_value_17 = agg_isNull_15 ?
/* 288 */ -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 289 */ agg_isNull_12 = agg_isNull_15;
/* 290 */ agg_value_14 = agg_value_17;
/* 291 */ } else {
/* 292 */ boolean agg_isNull_16 = true;
/* 293 */ long agg_value_18 = -1L;
/* 294 */ boolean agg_isNull_17 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 295 */ long agg_value_19 = agg_isNull_17 ?
/* 296 */ -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 297 */ if (!agg_isNull_17) {
/* 298 */ agg_isNull_16 = false; // resultCode could change nullability.
/* 299 */
/* 300 */ agg_value_18 = agg_value_19 + 1L;
/* 301 */
/* 302 */ }
/* 303 */ agg_isNull_12 = agg_isNull_16;
/* 304 */ agg_value_14 = agg_value_18;
/* 305 */ }
/* 306 */
/* 307 */ if (!agg_isNull_6) {
/* 308 */ agg_unsafeRowAggBuffer_0.setDouble(0, agg_value_8);
/* 309 */ } else {
/* 310 */ agg_unsafeRowAggBuffer_0.setNullAt(0);
/* 311 */ }
/* 312 */
/* 313 */ if (!agg_isNull_12) {
/* 314 */ agg_unsafeRowAggBuffer_0.setLong(1, agg_value_14);
/* 315 */ } else {
/* 316 */ agg_unsafeRowAggBuffer_0.setNullAt(1);
/* 317 */ }
/* 318 */ }
/* 319 */
/* 320 */ private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 321 */ if (columnartorow_mutableStateArray_1[0] == null) {
/* 322 */ columnartorow_nextBatch_0();
/* 323 */ }
/* 324 */ while ( columnartorow_mutableStateArray_1[0] != null) {
/* 325 */ int columnartorow_numRows_0 = columnartorow_mutableStateArray_1[0].numRows();
/* 326 */ int columnartorow_localEnd_0 = columnartorow_numRows_0 - columnartorow_batchIdx_0;
/* 327 */ for (int columnartorow_localIdx_0 = 0; columnartorow_localIdx_0 < columnartorow_localEnd_0; columnartorow_localIdx_0++) {
/* 328 */ int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 + columnartorow_localIdx_0;
/* 329 */ boolean columnartorow_isNull_0 = columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
/* 330 */ int columnartorow_value_0 = columnartorow_isNull_0 ? -1 : (columnartorow_mutableStateArray_2[0].getInt(columnartorow_rowIdx_0));
/* 331 */ boolean columnartorow_isNull_1 = columnartorow_mutableStateArray_2[1].isNullAt(columnartorow_rowIdx_0);
/* 332 */ int columnartorow_value_1 = columnartorow_isNull_1 ? -1 : (columnartorow_mutableStateArray_2[1].getInt(columnartorow_rowIdx_0));
/* 333 */
/* 334 */ agg_doConsume_0(columnartorow_value_0, columnartorow_isNull_0, columnartorow_value_1, columnartorow_isNull_1);
/* 335 */ // shouldStop check is eliminated
/* 336 */ }
/* 337 */ columnartorow_batchIdx_0 = columnartorow_numRows_0;
/* 338 */ columnartorow_mutableStateArray_1[0] = null;
/* 339 */ columnartorow_nextBatch_0();
/* 340 */ }
/* 341 */
/* 342 */ agg_fastHashMapIter_0 = agg_fastHashMap_0.rowIterator();
/* 343 */ agg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* avgHashProbe */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* numTasksFallBacked */));
/* 344 */
/* 345 */ }
/* 346 */
/* 347 */ protected void processNext() throws java.io.IOException {
/* 348 */ if (!agg_initAgg_0) {
/* 349 */ agg_initAgg_0 = true;
/* 350 */ agg_fastHashMap_0 = new agg_FastHashMap_0(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().taskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 351 */
/* 352 */ ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().addTaskCompletionListener(
/* 353 */ new org.apache.spark.util.TaskCompletionListener() {
/* 354 */ @Override
/* 355 */ public void onTaskCompletion(org.apache.spark.TaskContext context) {
/* 356 */ agg_fastHashMap_0.close();
/* 357 */ }
/* 358 */ });
/* 359 */
/* 360 */ agg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 361 */ long wholestagecodegen_beforeAgg_0 = System.nanoTime();
/* 362 */ agg_doAggregateWithKeys_0();
/* 363 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[10] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg_0) / 1000000);
/* 364 */ }
/* 365 */ // output the result
/* 366 */
/* 367 */ while ( agg_fastHashMapIter_0.next()) {
/* 368 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_fastHashMapIter_0.getKey();
/* 369 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_fastHashMapIter_0.getValue();
/* 370 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 371 */
/* 372 */ if (shouldStop()) return;
/* 373 */ }
/* 374 */ agg_fastHashMap_0.close();
/* 375 */
/* 376 */ while ( agg_mapIter_0.next()) {
/* 377 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_mapIter_0.getKey();
/* 378 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_mapIter_0.getValue();
/* 379 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 380 */ if (shouldStop()) return;
/* 381 */ }
/* 382 */ agg_mapIter_0.close();
/* 383 */ if (agg_sorter_0 == null) {
/* 384 */ agg_hashMap_0.free();
/* 385 */ }
/* 386 */ }
/* 387 */
/* 388 */ }
== Subtree 2 / 2 (maxMethodCodeSize:240; maxConstantPoolSize:229(0.35% used); numInnerClasses:0) ==
*(2) HashAggregate(keys=[key#57], functions=[avg(value#58)], output=[key#57, avg(value)#60])
+- Exchange hashpartitioning(key#57, 5), ENSURE_REQUIREMENTS, [id=#65]
+- *(1) HashAggregate(keys=[key#57], functions=[partial_avg(value#58)], output=[key#57, sum#65, count#66L])
+- *(1) ColumnarToRow
+- FileScan parquet default.agg1[key#57,value#58] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:int>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg_0;
/* 010 */ private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
/* 011 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
/* 012 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
/* 013 */ private scala.collection.Iterator inputadapter_input_0;
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 015 */
/* 016 */ public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 017 */ this.references = references;
/* 018 */ }
/* 019 */
/* 020 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 021 */ partitionIndex = index;
/* 022 */ this.inputs = inputs;
/* 023 */
/* 024 */ inputadapter_input_0 = inputs[0];
/* 025 */ agg_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 026 */ agg_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 027 */
/* 028 */ }
/* 029 */
/* 030 */ private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
/* 031 */ throws java.io.IOException {
/* 032 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* numOutputRows */).add(1);
/* 033 */
/* 034 */ boolean agg_isNull_8 = agg_keyTerm_0.isNullAt(0);
/* 035 */ int agg_value_8 = agg_isNull_8 ?
/* 036 */ -1 : (agg_keyTerm_0.getInt(0));
/* 037 */ boolean agg_isNull_9 = agg_bufferTerm_0.isNullAt(0);
/* 038 */ double agg_value_9 = agg_isNull_9 ?
/* 039 */ -1.0 : (agg_bufferTerm_0.getDouble(0));
/* 040 */ boolean agg_isNull_10 = agg_bufferTerm_0.isNullAt(1);
/* 041 */ long agg_value_10 = agg_isNull_10 ?
/* 042 */ -1L : (agg_bufferTerm_0.getLong(1));
/* 043 */ boolean agg_isNull_13 = agg_isNull_10;
/* 044 */ double agg_value_13 = -1.0;
/* 045 */ if (!agg_isNull_10) {
/* 046 */ agg_value_13 = (double) agg_value_10;
/* 047 */ }
/* 048 */ boolean agg_isNull_11 = false;
/* 049 */ double agg_value_11 = -1.0;
/* 050 */ if (agg_isNull_13 || agg_value_13 == 0) {
/* 051 */ agg_isNull_11 = true;
/* 052 */ } else {
/* 053 */ if (agg_isNull_9) {
/* 054 */ agg_isNull_11 = true;
/* 055 */ } else {
/* 056 */ agg_value_11 = (double)(agg_value_9 / agg_value_13);
/* 057 */ }
/* 058 */ }
/* 059 */
/* 060 */ agg_mutableStateArray_0[1].reset();
/* 061 */
/* 062 */ agg_mutableStateArray_0[1].zeroOutNullBytes();
/* 063 */
/* 064 */ if (agg_isNull_8) {
/* 065 */ agg_mutableStateArray_0[1].setNullAt(0);
/* 066 */ } else {
/* 067 */ agg_mutableStateArray_0[1].write(0, agg_value_8);
/* 068 */ }
/* 069 */
/* 070 */ if (agg_isNull_11) {
/* 071 */ agg_mutableStateArray_0[1].setNullAt(1);
/* 072 */ } else {
/* 073 */ agg_mutableStateArray_0[1].write(1, agg_value_11);
/* 074 */ }
/* 075 */ append((agg_mutableStateArray_0[1].getRow()));
/* 076 */
/* 077 */ }
/* 078 */
/* 079 */ private void agg_doConsume_0(InternalRow inputadapter_row_0, int agg_expr_0_0, boolean agg_exprIsNull_0_0, double agg_expr_1_0, boolean agg_exprIsNull_1_0, long agg_expr_2_0, boolean agg_exprIsNull_2_0) throws java.io.IOException {
/* 080 */ UnsafeRow agg_unsafeRowAggBuffer_0 = null;
/* 081 */
/* 082 */ // generate grouping key
/* 083 */ agg_mutableStateArray_0[0].reset();
/* 084 */
/* 085 */ agg_mutableStateArray_0[0].zeroOutNullBytes();
/* 086 */
/* 087 */ if (agg_exprIsNull_0_0) {
/* 088 */ agg_mutableStateArray_0[0].setNullAt(0);
/* 089 */ } else {
/* 090 */ agg_mutableStateArray_0[0].write(0, agg_expr_0_0);
/* 091 */ }
/* 092 */ int agg_unsafeRowKeyHash_0 = (agg_mutableStateArray_0[0].getRow()).hashCode();
/* 093 */ if (true) {
/* 094 */ // try to get the buffer from hash map
/* 095 */ agg_unsafeRowAggBuffer_0 =
/* 096 */ agg_hashMap_0.getAggregationBufferFromUnsafeRow((agg_mutableStateArray_0[0].getRow()), agg_unsafeRowKeyHash_0);
/* 097 */ }
/* 098 */ // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 099 */ // aggregation after processing all input rows.
/* 100 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 101 */ if (agg_sorter_0 == null) {
/* 102 */ agg_sorter_0 = agg_hashMap_0.destructAndCreateExternalSorter();
/* 103 */ } else {
/* 104 */ agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter());
/* 105 */ }
/* 106 */
/* 107 */ // the hash map had be spilled, it should have enough memory now,
/* 108 */ // try to allocate buffer again.
/* 109 */ agg_unsafeRowAggBuffer_0 = agg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 110 */ (agg_mutableStateArray_0[0].getRow()), agg_unsafeRowKeyHash_0);
/* 111 */ if (agg_unsafeRowAggBuffer_0 == null) {
/* 112 */ // failed to allocate the first page
/* 113 */ throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 114 */ }
/* 115 */ }
/* 116 */
/* 117 */ // common sub-expressions
/* 118 */
/* 119 */ // evaluate aggregate functions and update aggregation buffers
/* 120 */ agg_doAggregate_avg_0(agg_exprIsNull_2_0, agg_expr_1_0, agg_exprIsNull_1_0, agg_unsafeRowAggBuffer_0, agg_expr_2_0);
/* 121 */
/* 122 */ }
/* 123 */
/* 124 */ private void agg_doAggregate_avg_0(boolean agg_exprIsNull_2_0, double agg_expr_1_0, boolean agg_exprIsNull_1_0, org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_0, long agg_expr_2_0) throws java.io.IOException {
/* 125 */ boolean agg_isNull_2 = true;
/* 126 */ double agg_value_2 = -1.0;
/* 127 */ boolean agg_isNull_3 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 128 */ double agg_value_3 = agg_isNull_3 ?
/* 129 */ -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 130 */ if (!agg_isNull_3) {
/* 131 */ if (!agg_exprIsNull_1_0) {
/* 132 */ agg_isNull_2 = false; // resultCode could change nullability.
/* 133 */
/* 134 */ agg_value_2 = agg_value_3 + agg_expr_1_0;
/* 135 */
/* 136 */ }
/* 137 */
/* 138 */ }
/* 139 */ boolean agg_isNull_5 = true;
/* 140 */ long agg_value_5 = -1L;
/* 141 */ boolean agg_isNull_6 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 142 */ long agg_value_6 = agg_isNull_6 ?
/* 143 */ -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 144 */ if (!agg_isNull_6) {
/* 145 */ if (!agg_exprIsNull_2_0) {
/* 146 */ agg_isNull_5 = false; // resultCode could change nullability.
/* 147 */
/* 148 */ agg_value_5 = agg_value_6 + agg_expr_2_0;
/* 149 */
/* 150 */ }
/* 151 */
/* 152 */ }
/* 153 */
/* 154 */ if (!agg_isNull_2) {
/* 155 */ agg_unsafeRowAggBuffer_0.setDouble(0, agg_value_2);
/* 156 */ } else {
/* 157 */ agg_unsafeRowAggBuffer_0.setNullAt(0);
/* 158 */ }
/* 159 */
/* 160 */ if (!agg_isNull_5) {
/* 161 */ agg_unsafeRowAggBuffer_0.setLong(1, agg_value_5);
/* 162 */ } else {
/* 163 */ agg_unsafeRowAggBuffer_0.setNullAt(1);
/* 164 */ }
/* 165 */ }
/* 166 */
/* 167 */ private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 168 */ while ( inputadapter_input_0.hasNext()) {
/* 169 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 170 */
/* 171 */ boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
/* 172 */ int inputadapter_value_0 = inputadapter_isNull_0 ?
/* 173 */ -1 : (inputadapter_row_0.getInt(0));
/* 174 */ boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
/* 175 */ double inputadapter_value_1 = inputadapter_isNull_1 ?
/* 176 */ -1.0 : (inputadapter_row_0.getDouble(1));
/* 177 */ boolean inputadapter_isNull_2 = inputadapter_row_0.isNullAt(2);
/* 178 */ long inputadapter_value_2 = inputadapter_isNull_2 ?
/* 179 */ -1L : (inputadapter_row_0.getLong(2));
/* 180 */
/* 181 */ agg_doConsume_0(inputadapter_row_0, inputadapter_value_0, inputadapter_isNull_0, inputadapter_value_1, inputadapter_isNull_1, inputadapter_value_2, inputadapter_isNull_2);
/* 182 */ // shouldStop check is eliminated
/* 183 */ }
/* 184 */
/* 185 */ agg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* avgHashProbe */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* numTasksFallBacked */));
/* 186 */ }
/* 187 */
/* 188 */ protected void processNext() throws java.io.IOException {
/* 189 */ if (!agg_initAgg_0) {
/* 190 */ agg_initAgg_0 = true;
/* 191 */
/* 192 */ agg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 193 */ long wholestagecodegen_beforeAgg_0 = System.nanoTime();
/* 194 */ agg_doAggregateWithKeys_0();
/* 195 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg_0) / 1000000);
/* 196 */ }
/* 197 */ // output the result
/* 198 */
/* 199 */ while ( agg_mapIter_0.next()) {
/* 200 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_mapIter_0.getKey();
/* 201 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_mapIter_0.getValue();
/* 202 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 203 */ if (shouldStop()) return;
/* 204 */ }
/* 205 */ agg_mapIter_0.close();
/* 206 */ if (agg_sorter_0 == null) {
/* 207 */ agg_hashMap_0.free();
/* 208 */ }
/* 209 */ }
/* 210 */
/* 211 */ }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment