Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save fedor-malyshkin/e1b460959d91d37664dc329af3c5fc17 to your computer and use it in GitHub Desktop.
Save fedor-malyshkin/e1b460959d91d37664dc329af3c5fc17 to your computer and use it in GitHub Desktop.
FlowableTransformer creating buffer operator with time period and buffer size trigger in RxJava with backpressure supported
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* Based on idea from: https://stackoverflow.com/questions/50040510/rxjava-buffer-with-time-that-honours-backpressure
*/
public class BufferTransformer<T> implements FlowableTransformer<T, List<T>> {
private final Integer timeSpan;
private final Integer bufferSize;
private final TimeUnit timeUnit;
public BufferTransformer(int timespan, TimeUnit timeUnit, int bufferSize) {
this.timeSpan = timespan;
this.timeUnit = timeUnit;
this.bufferSize = bufferSize;
}
@Override
public Publisher<List<T>> apply(Flowable<T> upstream) {
return s -> upstream.subscribe(new BufferSubscriber<>(s));
}
static class SubscriptionCounter implements Subscription {
@Getter
private final AtomicLong counter = new AtomicLong(0);
@Getter
private volatile boolean cancel = false;
@Override
public void request(long n) {
counter.addAndGet(n);
}
@Override
public void cancel() {
cancel = true;
}
}
class BufferSubscriber<UpstreamType> implements Subscriber<UpstreamType> {
private final BufferSubscriber self = this;
private final SubscriptionCounter subscriptionCounter = new SubscriptionCounter();
private final Subscriber<? super List<UpstreamType>> downstream;
private volatile Subscription subscription;
private final AtomicLong timeNanos = new AtomicLong();
private final List<UpstreamType> buffer = new LinkedList<>();
private Optional<Disposable> timeoutTask = Optional.empty();
BufferSubscriber(Subscriber<? super List<UpstreamType>> actualSubscriber) {
this.downstream = actualSubscriber;
}
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
resetTimeWindow();
subscription.request(bufferSize);
this.downstream.onSubscribe(subscriptionCounter);
timeoutTask = Optional.of(Schedulers.computation().schedulePeriodicallyDirect(() -> {
synchronized (self) {
tryToFlushBuffer(true);
resetTimeWindow();
}
}, timeSpan, timeSpan, timeUnit));
}
@Override
public synchronized void onNext(UpstreamType t) {
buffer.add(t);
if (buffer.size() >= bufferSize) {
tryToFlushBuffer(true);
resetTimeWindow();
} else if (System.nanoTime() - timeNanos.get() >= timeUnit.toNanos(timeSpan)) {
tryToFlushBuffer(true);
resetTimeWindow();
}
}
private void tryToFlushBuffer(boolean query) {
if (subscriptionCounter.getCounter().get() > 0) {
subscriptionCounter.getCounter().decrementAndGet(); // one sent batch decrement by requested count by 1
downstream.onNext(new ArrayList<>(buffer));
buffer.clear();
if (query) {
subscription.request(bufferSize);
}
}
}
@Override
public synchronized void onError(Throwable t) {
timeoutTask.ifPresent(Disposable::dispose);
this.downstream.onError(t);
}
@Override
public synchronized void onComplete() {
timeoutTask.ifPresent(Disposable::dispose);
tryToFlushBuffer(false);
this.downstream.onComplete();
}
private void resetTimeWindow() {
timeNanos.set(System.nanoTime());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment