Skip to content

Instantly share code, notes, and snippets.

View zsxwing's full-sized avatar
:octocat:

Shixiong Zhu zsxwing

:octocat:
  • Databricks, Inc.
  • San Francisco
View GitHub Profile
@zsxwing
zsxwing / gist:c06d42c98145d1b60efe04c7e3122bec
Created April 28, 2023 07:59
git diff rel/release-2.7.7 rel/release-2.8.1 -- hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 30551b989f1..b119bc7af67 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -50,10 +50,10 @@
<property>
<name>hadoop.http.filter.initializers</name>
<value>org.apache.hadoop.http.lib.StaticUserWebFilter</value>
- <description>A comma separated list of class names. Each class in the list
- must extend org.apache.hadoop.http.FilterInitializer. The corresponding
val version = getCurrentVersionFromConnector
val appId = getCurrentAppIdFromConnector
val txn = startTxn()
val versionInTxn = txn.txnVersion(appId)
if (version <= versionInTxn) {
// Skip the write that's done
return
}
// write files and commit
watermark = 1 hour
First batch (max event time = null):
2017-06-07 10:00:00.000
StateStore will store 2017-06-07 10:00:00.000
Second batch (max event time = 2017-06-07 10:00:00.000):
2017-06-07 11:00:00.000
@zsxwing
zsxwing / StreamingApp.scala
Last active February 29, 2016 20:51
StreamingApp.scala
package streaming.app
import java.util.UUID
import scala.util.Random
import scala.util.control.NonFatal
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.SQLContext
Observable.just(1)
.doOnSubscribe(() -> System.out.println("before 1st doOnSubscribe: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> System.out.println("before 2nd doOnSubscribe: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.doOnSubscribe(() -> System.out.println("before 3rd doOnSubscribe: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation())
.doOnSubscribe(() -> System.out.println("before subscribe: " + Thread.currentThread().getName()))
.subscribe(new Subscriber<Integer>() {
@Override
final JavaStreamingContext jssc = new JavaStreamingContext(...);
final Time exitTime = new Time(12345L); // Need to set the correct exit time
jssc.addStreamingListener(new StreamingListener(){
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
if (batchCompleted.batchInfo().batchTime().greaterEq(exitTime)) {
new Thread() {
@Override
public void run() {
jssc.stop(true, true);
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.scheduler.*;
import scala.Tuple2;
@zsxwing
zsxwing / test2.scala
Last active August 29, 2015 14:05
This example can work.
scala> class Foo { def foo() = Array(1.0) }
defined class Foo
scala> var m: Array[Double] = null
m: Array[Double] = null
scala> {
| val t = new Foo
| m = t.foo
| }
@zsxwing
zsxwing / test.scala
Created August 14, 2014 14:44
Report Foo cannot be serialized.
scala> class Foo { def foo() = Array(1.0) }
defined class Foo
scala> val t = new Foo
t: Foo = $iwC$$iwC$$iwC$$iwC$Foo@5ef6a5b6
scala> val m = t.foo
m: Array[Double] = Array(1.0)
scala> val r1 = sc.parallelize(List(1, 2, 3))
@zsxwing
zsxwing / hbase-spark.scala
Created August 11, 2014 08:29
hbase-spark.scala
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64
def convertScanToString(scan: Scan): String = {