Skip to content

Instantly share code, notes, and snippets.

View HeartSaVioR's full-sized avatar
🏠
Working from home

Jungtaek Lim HeartSaVioR

🏠
Working from home
View GitHub Profile
@HeartSaVioR
HeartSaVioR / ClassInvestigator.java
Created October 1, 2018 04:11
Tiny class to investigate why NoSuchMethodException occurs on your classpath
package net.heartsavior;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
public class ClassInvestigator {
public static void main(String[] args) {
if (args.length < 1) {
@HeartSaVioR
HeartSaVioR / a-bit-tricky-spark-sql.scala
Last active August 20, 2018 22:00
A bit tricky result of Spark SQL query result (Tested with 2.3.0)
/////////////////////////////////////////////////////////////////////////////////////////////
// 1. select with swapping columns, and apply where
/////////////////////////////////////////////////////////////////////////////////////////////
import spark.implicits._
import org.apache.spark.sql.{DataFrame, Dataset}
case class Hello(id: Int, name1: String, name2: String)
val ds = List(Hello(1, "Alice", "Bob"), Hello(2, "Bob", "Alice")).toDS
@HeartSaVioR
HeartSaVioR / StreamingAnalyticsTruckingRefApp.scala
Created June 27, 2018 09:51
Structured Streaming version of HDF IoT Trucking: StreamingAnalyticsTruckingRefApp
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Trigger}
import org.apache.spark.sql.types._
import org.apache.commons.logging.LogFactory
object StreamingAnalyticsTruckingRefApp {
@HeartSaVioR
HeartSaVioR / analysis_app_query_progress.scala
Last active June 27, 2018 07:49
Analysis streaming query progresses being ingested in Kafka topic
// assuming we paste the code to `spark-shell`
// spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.types._
import spark.implicits._
val df = spark
.read
.format("kafka")
@HeartSaVioR
HeartSaVioR / SparkTrialCalculateUnsafeRowSize.scala
Last active June 5, 2018 09:30
Calculating size for various kinds of UnsafeRow
package net.heartsavior.spark.trial
import java.nio.charset.StandardCharsets
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SizeEstimator
object SparkTrialCalculateUnsafeRowSize {
@HeartSaVioR
HeartSaVioR / StructuredStreamingTrial3.scala
Last active April 23, 2018 02:56
Structured Streaming: select a row from each group which has max value of specific field (requires aggregation)
// This is to answer the question regarding structured streaming on the Apache Spark mailing list
// http://apache-spark-user-list.1001560.n3.nabble.com/can-we-use-mapGroupsWithState-in-raw-sql-tp31885p31893.html
/*
Input:
id | amount | my_timestamp
-------------------------------------------
1 | 5 | 2018-04-01T01:00:00.000Z
1 | 10 | 2018-04-01T01:10:00.000Z
@HeartSaVioR
HeartSaVioR / SparkTrial2.scala
Created April 6, 2018 13:19
Another Practice on Structured Streaming ingesting from Kafka and pushing to Kafka using continuous mode
package net.heartsavior.spark.trial
import org.apache.spark.sql.catalyst.expressions.StructsToJson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
object SparkTrial2 {
def main(args: Array[String]): Unit = {
@HeartSaVioR
HeartSaVioR / SparkTrial.scala
Created April 4, 2018 08:43
Practice on Structured Streaming ingesting from Kafka and pushing to console
package net.heartsavior.spark.trial
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
object SparkTrial {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
@HeartSaVioR
HeartSaVioR / test_runner_tvl_2.0.0_worker_1.sh
Created February 14, 2018 10:24
TVL 2.0.0 test runner (for 1 worker)
#!/bin/bash
current_test=1
test_count=3
storm_directory="`pwd`"
part1=`dirname "${storm_directory}"`
part2=`basename "${storm_directory}"`
@HeartSaVioR
HeartSaVioR / test_runner_tvl_2.0.0.sh
Created February 14, 2018 10:23
TVL 2.0.0 test runner (for 4 workers)
#!/bin/bash
current_test=1
test_count=3
storm_directory="`pwd`"
part1=`dirname "${storm_directory}"`
part2=`basename "${storm_directory}"`