Skip to content

Instantly share code, notes, and snippets.

View GrigorievNick's full-sized avatar

Nick GrigorievNick

View GitHub Profile
@GrigorievNick
GrigorievNick / TestObservationWithForeach.scala
Last active January 12, 2023 17:30
Spark Observation API Bug, reproduce
import org.apache.spark.sql.Observation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.scalatest.FunSuite
class TestObservationWithForeach extends FunSuite {
test("observation") {
implicit val spark: SparkSession = SparkSession.builder()
@GrigorievNick
GrigorievNick / SparkSQLGeneratePartitionOffset.scala
Last active September 7, 2021 15:13
Spark Create unique sequential id per spark partition
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.LeafExpression
import org.apache.spark.sql.catalyst.expressions.Stateful
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
@GrigorievNick
GrigorievNick / IcebergMergeOnRead.scala
Last active August 16, 2021 21:29
IcebergRead from executor
import com.google.common.collect.Maps
import org.apache.iceberg.TableProperties
import org.apache.iceberg.data.IcebergGenerics
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil.convert
import org.apache.spark.TaskContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
@GrigorievNick
GrigorievNick / MergeDataframe.scala
Last active August 4, 2021 09:49
Merge records in two dataframes by id columns
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.scalatest.FunSuite
import java.sql.Timestamp
@GrigorievNick
GrigorievNick / GracefulStopOnShutdownListener.scala
Last active April 10, 2024 14:01
Spark Structure Streaming GraceFullShutdown on Sigterm. Sigterm will not interrupt currently running batch, but due to asynс nature of SparkQueryListner.onProgres method, can interrupt next batch during first few moments.
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.slf4j.LoggerFactory
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.TimeUnit
@GrigorievNick
GrigorievNick / ForceRepartitionByRangeCoalescePartitions.scala
Created July 3, 2021 11:50
Spark custom Physical plan optimisation to AQE force post shuffle coalesce for repartitionByRange with user specified partitions num.
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.CoalescedPartitionSpec
import org.apache.spark.sql.internal.SQLConf