Skip to content

Instantly share code, notes, and snippets.

Created October 14, 2013 20:57
Show Gist options
  • Save elandau/6982077 to your computer and use it in GitHub Desktop.
Save elandau/6982077 to your computer and use it in GitHub Desktop.
JavaRX based concurrent thread model for polling data from a synchronous source
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.Observable.OnSubscribeFunc;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func1;
* Specialized operation used to drive the starting point of a stream reader with built
* in concurrency for polling a provided function for the next batch from the stream.
* This module will scale up to the provided number of concurrent operations. On error,
* it will scale down to 1 thread until a successful read. onError will not propagate
* down to the observer.
* @author elandau
public class OperationConcurrent {
* @param source Function providing the input
* @param delayFunc Policy for deciding how long to delay retrying the source function
* in case it is empty of of an error. The parameter is the ratio
* of batch fullness (0.0 none or error, 1.0 full). Return value is
* delay in msec.
* @param batchSize Number of elements to read from the function
* @param maxThreads Maximum number of concurrent threads
* @param name Name to give threads (must have a %d for the thread id)
* @return
public static <T> OnSubscribeFunc<List<T>> concurrent(
Func1<Integer, List<T>> source,
Func1<Double, Long> delayFunc,
final int batchSize,
final int maxThreads,
final String name) {
return concurrent(source,
new ThreadFactoryBuilder()
* Create a concurrent operation with a provided scheduler
* @param source
* @param scheduler
* @param maxThreads
* @param batchSize
* @return
public static <T> OnSubscribeFunc<List<T>> concurrent(
final Func1<Integer, List<T>> source,
final Func1<Double, Long> delayFunc,
final int maxThreads,
final int batchSize,
final Scheduler scheduler
) {
return new OnSubscribeFunc<List<T>>() {
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
return new ScheduledSubscription<T>(source, delayFunc, observer, scheduler, maxThreads, batchSize);
private final static AtomicInteger idCounter = new AtomicInteger(0);
private static class ScheduledSubscription<T> implements Subscription {
private final Func1<Integer, List<T>> sourceFunc;
private final Func1<Double, Long> delayFunc;
private final Scheduler scheduler;
private final Observer<? super List<T>> observer;
private final int batchSize;
private final int maxThreads;
private int counter;
private final CompositeSubscription subs = new CompositeSubscription();
private final Object lock = new Object();
class ObserverAction implements Action0 {
private Subscription scheduledSub;
private final int id;
public ObserverAction() {
id = idCounter.incrementAndGet();
synchronized (lock) {
scheduledSub = scheduler.schedule(this);
public boolean removeSub() {
synchronized(lock) {
if (counter > 1) {
return true;
else {
return false;
public void addSub() {
synchronized (lock) {
if (counter < maxThreads) {
new ObserverAction();
public void rescheduleSub(int count) {
Long delay = / (double)batchSize);
if (delay == null || delay == 0) {
scheduledSub = scheduler.schedule(this);
else {
scheduledSub = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
public void call() {
try {
// Get the next batch and send to the observer
List<T> t1 =;
// Can downsize
if (t1.size() < batchSize) {
if (!removeSub()) {
// Increase reader pool (if not at max) and reschedule this one
else {
catch (Throwable t) {
// Remove all threads except for one
if (!removeSub()) {
private ScheduledSubscription(
Func1<Integer, List<T>> sourceFunc,
Func1<Double, Long> delayFunc,
Observer<? super List<T>> observer,
Scheduler scheduler,
int batchSize,
int maxThreads) {
this.sourceFunc = sourceFunc;
this.delayFunc = delayFunc;
this.scheduler = scheduler; = observer;
this.batchSize = batchSize;
this.maxThreads = maxThreads;
this.counter = 0;
new ObserverAction();
public void unsubscribe() {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment