Skip to content

Instantly share code, notes, and snippets.

View HeartSaVioR's full-sized avatar
🏠
Working from home

Jungtaek Lim HeartSaVioR

🏠
Working from home
View GitHub Profile
@HeartSaVioR
HeartSaVioR / gist:3e07476f8f9ddeb783428dbe9dfc5aff
Last active October 13, 2020 19:25
experimenting iceberg column types with Spark
import org.apache.iceberg.{PartitionSpec, Schema, Table, TableProperties}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types
val hadoopCatalog = new HadoopCatalog(spark.sparkContext.hadoopConfiguration, "/Playground/iceberg-trial/warehouse-spark3.0.1")
val structSchema = Types.StructType.of(
Types.NestedField.optional(161, "a", Types.StringType.get()),
Types.NestedField.optional(162, "b", Types.IntegerType.get()))
@HeartSaVioR
HeartSaVioR / ProblematicApp.java
Created May 8, 2020 15:30
Mismatched pair of getter/setter in Encoders.bean
package net.heartsavior.spark.trial;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsWithStateFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
@HeartSaVioR
HeartSaVioR / CustomSessionWindowExample.scala
Created July 26, 2019 13:50
[Spark Structured Streaming] The example of custom type of session window (allow logout event to close session, along with allowing inactivity)
package net.heartsavior.spark
import java.sql.Timestamp
import java.util.Calendar
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import scala.util.Random
@HeartSaVioR
HeartSaVioR / CustomSessionWindowExample.scala
Last active November 6, 2022 20:47
[Flink DataStream API] The example of custom type of session window (allow logout event to close session, along with allowing inactivity)
package net.heartsavior.flink
import java.util
import java.util.{Calendar, Collections}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
import org.apache.flink.api.scala._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
@HeartSaVioR
HeartSaVioR / spark-kafka-consumer-pool-test-query-concurrent-access-v2.scala
Last active March 13, 2019 09:05
Performance test code on Spark Kafka Consumer Pool (concurrent access on topic partition) - v2
import java.io.{File, PrintWriter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQueryListener
import spark.implicits._
@HeartSaVioR
HeartSaVioR / spark-kafka-consumer-pool-test-query-concurrent-access.scala
Last active March 1, 2019 23:16
Performance test code on Spark Kafka Consumer Pool (concurrent access on topic partition)
import java.io.{File, PrintWriter}
import org.apache.commons.logging.LogFactory
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQueryListener
@HeartSaVioR
HeartSaVioR / spark-kafka-consumer-pool-test-query.scala
Created March 1, 2019 14:37
Performance test code on Spark Kafka Consumer Pool
// run the code in spark-shell
// e.g.: ./bin/spark-shell --master "local[3]" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-SNAPSHOT
val branch = "master" // change this when changing version of "spark-sql-kafka"
val attempt = "1" // change this according to the attempt No.
// :paste
import java.io.{File, PrintWriter}
case class SessionInfo(sessionStartTimestampMs: Long,
sessionEndTimestampMs: Long,
numEvents: Int) {
/** Duration of the session, between the first and last events + session gap */
def durationMs: Long = sessionEndTimestampMs - sessionStartTimestampMs
}
case class SessionUpdate(id: String,
sessionStartTimestampSecs: Long,
@HeartSaVioR
HeartSaVioR / EventTimeSessionWindowImplementationViaFlatMapGroupsWithState.scala
Last active November 15, 2020 09:15
Implementation of session window with event time and watermark via flatMapGroupsWithState, and SPARK-10816
case class SessionInfo(sessionStartTimestampMs: Long,
sessionEndTimestampMs: Long,
numEvents: Int) {
/** Duration of the session, between the first and last events + session gap */
def durationMs: Long = sessionEndTimestampMs - sessionStartTimestampMs
}
case class SessionUpdate(id: String,
sessionStartTimestampSecs: Long,
@HeartSaVioR
HeartSaVioR / SparkDStreamTrial.scala
Created October 15, 2018 06:52
Simple Spark DStream with Kafka
package net.heartsavior.spark.trial
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe