Skip to content

Instantly share code, notes, and snippets.

Ben Christensen benjchristensen

Block or report user

Report or block benjchristensen

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
@benjchristensen
benjchristensen / MulticastColdFiniteBackpressureExample.java
Created Aug 4, 2014
Multicasting a cold, finite Observable and using onBackpressureBuffer/Drop to handle overflow
View MulticastColdFiniteBackpressureExample.java
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
@benjchristensen
benjchristensen / ReactivePullHotOnBackpressureDrop.java
Last active Feb 22, 2017
Handling a hot Observable producing faster than the Subscriber with onBackpressureDrop
View ReactivePullHotOnBackpressureDrop.java
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* This demonstrates how to use onBackpressureDrop when a hot stream doesn't itself handle "reactive pull"
*
*/
public class ReactivePullHotOnBackpressureDrop {
@benjchristensen
benjchristensen / ReactivePullColdNonConformant.java
Created Aug 4, 2014
Handling an Observable Iterable without Reactive Pull using onBackpressureBuffer
View ReactivePullColdNonConformant.java
import java.util.ArrayList;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* This demonstrates a "cold" Observable that does not use "reactive pull" and how to handle it.
*/
public class ReactivePullColdNonConformant {
@benjchristensen
benjchristensen / ReactivePullCold.java
Created Aug 4, 2014
ReactivePull Iterable Example
View ReactivePullCold.java
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* Example of a "cold Observable" using "reactive pull" to emit only as many items as requested by Subscriber.
View ThrottleExample.java
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ThrottleExample {
public static void main(String args[]) {
// first item emitted in each time window
View SampleExample.java
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class SampleExample {
public static void main(String args[]) {
hotStream().sample(500, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
View WindowExample.java
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class WindowExample {
public static void main(String args[]) {
// buffer every 500ms (using 999999999 to mark start of output)
View BufferExample.java
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class BufferExample {
public static void main(String args[]) {
// buffer every 500ms
View DebounceExample.java
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class DebounceExample {
public static void main(String args[]) {
View EventBus.java
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
* Simple pass-thru event bus with error handling and reconnect.
*/
public class EventBus {
You can’t perform that action at this time.