Skip to content

Instantly share code, notes, and snippets.

View xuechendi's full-sized avatar

Chendi.Xue xuechendi

View GitHub Profile
@xuechendi
xuechendi / run.sh
Created June 10, 2020 09:58
Use a continuous memory for ArrayItemIndex
gcc -std=c++11 -Wno-deprecated-declarations -I/mnt/nvme2/chendi/intel-bigdata/OAP/oap-native-sql/cpp/build/releases/include/ -L/mnt/nvme2/chendi/intel-bigdata/OAP/oap-native-sql/cpp/build/releases /mnt/nvme2/chendi/intel-bigdata/OAP/oap-native-sql/cpp/build/releases/tmp//spark-columnar-plugin-codegen-d45dbc9bec5aba67.cc -o /mnt/nvme2/chendi/intel-bigdata/OAP/oap-native-sql/cpp/build/releases/tmp//spark-columnar-plugin-codegen-d45dbc9bec5aba67.so -O3 -shared -fPIC -larrow -lspark_columnar_jni 2> /mnt/nvme2/chendi/intel-bigdata/OAP/oap-native-sql/cpp/build/releases/tmp//spark-columnar-plugin-codegen-d45dbc9bec5aba67.log
/mnt/nvme2/chendi/intel-bigdata/OAP/oap-native-sql/cpp/build/src/benchmarks/BenchmarkArrowComputeJoin --gtest_filter=BenchmarkArrowComputeJoin.JoinBenchmark
package org.apache.spark.columnarPlugin.expression;
import java.util.Random;
public class ColumnarArithmeticOptimizer {
public static void columnarBatchAdd(int numRows, int[][] input, int[] result) {
for (int i = 0; i < numRows; i++) {
for (int j = 0; j < input.length; j++) {
result[i] += input[j][i];
@xuechendi
xuechendi / plugin.scala
Created June 21, 2019 00:25
Example plugin for spark.session.extension to run on CPU
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@xuechendi
xuechendi / filter_project.java
Created June 13, 2019 23:16
filter_project.java
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator scan_input;
/* 009 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
@xuechendi
xuechendi / filter.java
Created June 13, 2019 22:52
filter.java
/* 084 */ protected void processNext() throws java.io.IOException {
/* 085 */ if (scan_batch == null) {
/* 086 */ scan_nextBatch();
/* 087 */ }
/* 088 */ while (scan_batch != null) {
/* 089 */ int numRows = scan_batch.numRows();
/* 090 */ while (scan_batchIdx < numRows) {
/* 091 */ int scan_rowIdx = scan_batchIdx++;
/* 092 */ boolean scan_isNull2 = scan_colInstance2.isNullAt(scan_rowIdx);
/* 093 */ UTF8String scan_value2 = scan_isNull2 ? null : (scan_colInstance2.getUTF8String(scan_rowIdx));
@xuechendi
xuechendi / Others
Last active June 13, 2019 02:01
nativesql codegen
static inline int appendRowBuffer(rowBuffer* buffer, UnsafeRow *row) {
assert(buffer->cursor + row->sizeInBytes + 8 <= buffer->size);
*(int*)(char*)(buffer->buffer + buffer->cursor) = row->sizeInBytes;
*(int*)(char*)(buffer->buffer + buffer->cursor + 4) = row->numFields;
buffer->numRows++;
buffer->cursor += row->sizeInBytes + 8;
// Set the buffer full for safety if it exceeds the threshold output buffer capacity
@xuechendi
xuechendi / prepare-tpcds.scala
Created June 10, 2019 15:36
Spark-Sql-Perf scripts
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
val conf = new SparkConf().setAppName("tpcds-prepare")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Set:
// codegenStageId=1
final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator[] inputs;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
private scala.collection.Iterator[] batchscan_mutableStateArray_0 = new scala.collection.Iterator[1];
public GeneratedIteratorForCodegenStage1(Object[] references) {
this.references = references;
}
@xuechendi
xuechendi / pyspark_expression.py
Created April 24, 2019 01:40
Apache_Arrow_PySpark_Verification
from pyspark.sql import SparkSession
def plus_one_func(v):
return v + 1
spark = SparkSession.builder.appName("pyspark_expression").getOrCreate()
df = spark.read.load("/HiBench/DataFrame/Input")
df = df.withColumn('count', plus_one(df["count"]))
df.write.format("parquet").save("/HiBench/DataFrame/Output")
@xuechendi
xuechendi / spark.diff
Created March 28, 2019 06:46
Add read time latency log
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 94def4d31c..e71faba54f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -100,7 +100,7 @@ final class ShuffleBlockFetcherIterator(
* A queue to hold our results. This turns the asynchronous model provided by
* [[org.apache.spark.network.BlockTransferService]] into a synchronous model (iterator).
*/
- private[this] val results = new LinkedBlockingQueue[FetchResult]
+ private[this] val results = new LinkedBlockingQueue[(FetchResult, Long)]