Skip to content

Instantly share code, notes, and snippets.

View benjchristensen's full-sized avatar

Ben Christensen benjchristensen

View GitHub Profile
@benjchristensen
benjchristensen / MulticastColdFiniteBackpressureExample.java
Created August 4, 2014 17:20
Multicasting a cold, finite Observable and using onBackpressureBuffer/Drop to handle overflow
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
@benjchristensen
benjchristensen / ReactivePullHotOnBackpressureDrop.java
Last active February 22, 2017 17:23
Handling a hot Observable producing faster than the Subscriber with onBackpressureDrop
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* This demonstrates how to use onBackpressureDrop when a hot stream doesn't itself handle "reactive pull"
*
*/
public class ReactivePullHotOnBackpressureDrop {
@benjchristensen
benjchristensen / ReactivePullColdNonConformant.java
Created August 4, 2014 16:57
Handling an Observable Iterable without Reactive Pull using onBackpressureBuffer
import java.util.ArrayList;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* This demonstrates a "cold" Observable that does not use "reactive pull" and how to handle it.
*/
public class ReactivePullColdNonConformant {
@benjchristensen
benjchristensen / ReactivePullCold.java
Created August 4, 2014 16:56
ReactivePull Iterable Example
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* Example of a "cold Observable" using "reactive pull" to emit only as many items as requested by Subscriber.
@benjchristensen
benjchristensen / ThrottleExample.java
Last active August 29, 2015 14:04
ThrottleExample
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ThrottleExample {
public static void main(String args[]) {
// first item emitted in each time window
@benjchristensen
benjchristensen / SampleExample.java
Last active August 29, 2015 14:04
SampleExample
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class SampleExample {
public static void main(String args[]) {
hotStream().sample(500, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
@benjchristensen
benjchristensen / WindowExample.java
Last active August 29, 2015 14:04
WindowExample
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class WindowExample {
public static void main(String args[]) {
// buffer every 500ms (using 999999999 to mark start of output)
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class BufferExample {
public static void main(String args[]) {
// buffer every 500ms
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class DebounceExample {
public static void main(String args[]) {
@benjchristensen
benjchristensen / EventBus.java
Last active February 24, 2022 03:02
EventBus.java
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
* Simple pass-thru event bus with error handling and reconnect.
*/
public class EventBus {