Skip to content

Instantly share code, notes, and snippets.

Shixiong Zhu zsxwing

  • Databricks, Inc.
  • San Francisco
Block or report user

Report or block zsxwing

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View gist:18ee545ac640d3009b793763a2e7ec8b
watermark = 1 hour
First batch (max event time = null):
2017-06-07 10:00:00.000
StateStore will store 2017-06-07 10:00:00.000
Second batch (max event time = 2017-06-07 10:00:00.000):
2017-06-07 11:00:00.000
@zsxwing
zsxwing / StreamingApp.scala
Last active Feb 29, 2016
StreamingApp.scala
View StreamingApp.scala
package streaming.app
import java.util.UUID
import scala.util.Random
import scala.util.control.NonFatal
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.SQLContext
View doOnSubscribe.java
Observable.just(1)
.doOnSubscribe(() -> System.out.println("before 1st doOnSubscribe: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> System.out.println("before 2nd doOnSubscribe: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.doOnSubscribe(() -> System.out.println("before 3rd doOnSubscribe: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation())
.doOnSubscribe(() -> System.out.println("before subscribe: " + Thread.currentThread().getName()))
.subscribe(new Subscriber<Integer>() {
@Override
View SkipBatches.java
final JavaStreamingContext jssc = new JavaStreamingContext(...);
final Time exitTime = new Time(12345L); // Need to set the correct exit time
jssc.addStreamingListener(new StreamingListener(){
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
if (batchCompleted.batchInfo().batchTime().greaterEq(exitTime)) {
new Thread() {
@Override
public void run() {
jssc.stop(true, true);
View StreamingApp1.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.scheduler.*;
import scala.Tuple2;
@zsxwing
zsxwing / test2.scala
Last active Aug 29, 2015
This example can work.
View test2.scala
scala> class Foo { def foo() = Array(1.0) }
defined class Foo
scala> var m: Array[Double] = null
m: Array[Double] = null
scala> {
| val t = new Foo
| m = t.foo
| }
@zsxwing
zsxwing / test.scala
Created Aug 14, 2014
Report Foo cannot be serialized.
View test.scala
scala> class Foo { def foo() = Array(1.0) }
defined class Foo
scala> val t = new Foo
t: Foo = $iwC$$iwC$$iwC$$iwC$Foo@5ef6a5b6
scala> val m = t.foo
m: Array[Double] = Array(1.0)
scala> val r1 = sc.parallelize(List(1, 2, 3))
@zsxwing
zsxwing / hbase-spark.scala
Created Aug 11, 2014
hbase-spark.scala
View hbase-spark.scala
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64
def convertScanToString(scan: Scan): String = {
View AndroidObservable.java
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
View RxBroadcastReceiver.java
package rx.android.observables;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.android.subscriptions.AndroidSubscriptions;
import rx.functions.Action0;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
You can’t perform that action at this time.