Skip to content

Instantly share code, notes, and snippets.

@elandau
Created October 14, 2013 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elandau/6982077 to your computer and use it in GitHub Desktop.
Save elandau/6982077 to your computer and use it in GitHub Desktop.
JavaRX based concurrent thread model for polling data from a synchronous source
package com.netflix.schlep.rx;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.Observable.OnSubscribeFunc;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func1;
/**
* Specialized operation used to drive the starting point of a stream reader with built
* in concurrency for polling a provided function for the next batch from the stream.
* This module will scale up to the provided number of concurrent operations. On error,
* it will scale down to 1 thread until a successful read. onError will not propagate
* down to the observer.
*
* @author elandau
*
*/
public class OperationConcurrent {
/**
* @param source Function providing the input
* @param delayFunc Policy for deciding how long to delay retrying the source function
* in case it is empty of of an error. The parameter is the ratio
* of batch fullness (0.0 none or error, 1.0 full). Return value is
* delay in msec.
* @param batchSize Number of elements to read from the function
* @param maxThreads Maximum number of concurrent threads
* @param name Name to give threads (must have a %d for the thread id)
* @return
*/
public static <T> OnSubscribeFunc<List<T>> concurrent(
Func1<Integer, List<T>> source,
Func1<Double, Long> delayFunc,
final int batchSize,
final int maxThreads,
final String name) {
return concurrent(source,
delayFunc,
maxThreads,
batchSize,
Schedulers.executor(Executors.newScheduledThreadPool(
maxThreads,
new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(true)
.build()))
);
}
/**
* Create a concurrent operation with a provided scheduler
*
* @param source
* @param scheduler
* @param maxThreads
* @param batchSize
* @return
*/
public static <T> OnSubscribeFunc<List<T>> concurrent(
final Func1<Integer, List<T>> source,
final Func1<Double, Long> delayFunc,
final int maxThreads,
final int batchSize,
final Scheduler scheduler
) {
return new OnSubscribeFunc<List<T>>() {
@Override
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
return new ScheduledSubscription<T>(source, delayFunc, observer, scheduler, maxThreads, batchSize);
}
};
}
private final static AtomicInteger idCounter = new AtomicInteger(0);
private static class ScheduledSubscription<T> implements Subscription {
private final Func1<Integer, List<T>> sourceFunc;
private final Func1<Double, Long> delayFunc;
private final Scheduler scheduler;
private final Observer<? super List<T>> observer;
private final int batchSize;
private final int maxThreads;
private int counter;
private final CompositeSubscription subs = new CompositeSubscription();
private final Object lock = new Object();
class ObserverAction implements Action0 {
private Subscription scheduledSub;
private final int id;
public ObserverAction() {
id = idCounter.incrementAndGet();
synchronized (lock) {
counter++;
scheduledSub = scheduler.schedule(this);
subs.add(scheduledSub);
}
}
public boolean removeSub() {
synchronized(lock) {
if (counter > 1) {
counter--;
subs.remove(scheduledSub);
return true;
}
else {
return false;
}
}
}
public void addSub() {
synchronized (lock) {
if (counter < maxThreads) {
new ObserverAction();
}
}
}
public void rescheduleSub(int count) {
subs.remove(scheduledSub);
Long delay = delayFunc.call((double)count / (double)batchSize);
if (delay == null || delay == 0) {
scheduledSub = scheduler.schedule(this);
}
else {
scheduledSub = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
subs.add(scheduledSub);
}
@Override
public void call() {
try {
// Get the next batch and send to the observer
List<T> t1 = sourceFunc.call(batchSize);
observer.onNext(t1);
// Can downsize
if (t1.size() < batchSize) {
if (!removeSub()) {
rescheduleSub(t1.size());
}
}
// Increase reader pool (if not at max) and reschedule this one
else {
addSub();
rescheduleSub(t1.size());
}
}
catch (Throwable t) {
// Remove all threads except for one
if (!removeSub()) {
rescheduleSub(0);
}
}
}
}
private ScheduledSubscription(
Func1<Integer, List<T>> sourceFunc,
Func1<Double, Long> delayFunc,
Observer<? super List<T>> observer,
Scheduler scheduler,
int batchSize,
int maxThreads) {
this.sourceFunc = sourceFunc;
this.delayFunc = delayFunc;
this.scheduler = scheduler;
this.observer = observer;
this.batchSize = batchSize;
this.maxThreads = maxThreads;
this.counter = 0;
new ObserverAction();
}
@Override
public void unsubscribe() {
subs.unsubscribe();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment