Last active
October 22, 2019 16:27
-
-
Save fedor-malyshkin/e1b460959d91d37664dc329af3c5fc17 to your computer and use it in GitHub Desktop.
FlowableTransformer creating buffer operator with time period and buffer size trigger in RxJava with backpressure supported
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Flowable; | |
import io.reactivex.FlowableTransformer; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.schedulers.Schedulers; | |
import java.util.ArrayList; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Optional; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import lombok.Getter; | |
import org.reactivestreams.Publisher; | |
import org.reactivestreams.Subscriber; | |
import org.reactivestreams.Subscription; | |
/** | |
* Based on idea from: https://stackoverflow.com/questions/50040510/rxjava-buffer-with-time-that-honours-backpressure | |
*/ | |
public class BufferTransformer<T> implements FlowableTransformer<T, List<T>> { | |
private final Integer timeSpan; | |
private final Integer bufferSize; | |
private final TimeUnit timeUnit; | |
public BufferTransformer(int timespan, TimeUnit timeUnit, int bufferSize) { | |
this.timeSpan = timespan; | |
this.timeUnit = timeUnit; | |
this.bufferSize = bufferSize; | |
} | |
@Override | |
public Publisher<List<T>> apply(Flowable<T> upstream) { | |
return s -> upstream.subscribe(new BufferSubscriber<>(s)); | |
} | |
static class SubscriptionCounter implements Subscription { | |
@Getter | |
private final AtomicLong counter = new AtomicLong(0); | |
@Getter | |
private volatile boolean cancel = false; | |
@Override | |
public void request(long n) { | |
counter.addAndGet(n); | |
} | |
@Override | |
public void cancel() { | |
cancel = true; | |
} | |
} | |
class BufferSubscriber<UpstreamType> implements Subscriber<UpstreamType> { | |
private final BufferSubscriber self = this; | |
private final SubscriptionCounter subscriptionCounter = new SubscriptionCounter(); | |
private final Subscriber<? super List<UpstreamType>> downstream; | |
private volatile Subscription subscription; | |
private final AtomicLong timeNanos = new AtomicLong(); | |
private final List<UpstreamType> buffer = new LinkedList<>(); | |
private Optional<Disposable> timeoutTask = Optional.empty(); | |
BufferSubscriber(Subscriber<? super List<UpstreamType>> actualSubscriber) { | |
this.downstream = actualSubscriber; | |
} | |
@Override | |
public void onSubscribe(Subscription s) { | |
this.subscription = s; | |
resetTimeWindow(); | |
subscription.request(bufferSize); | |
this.downstream.onSubscribe(subscriptionCounter); | |
timeoutTask = Optional.of(Schedulers.computation().schedulePeriodicallyDirect(() -> { | |
synchronized (self) { | |
tryToFlushBuffer(true); | |
resetTimeWindow(); | |
} | |
}, timeSpan, timeSpan, timeUnit)); | |
} | |
@Override | |
public synchronized void onNext(UpstreamType t) { | |
buffer.add(t); | |
if (buffer.size() >= bufferSize) { | |
tryToFlushBuffer(true); | |
resetTimeWindow(); | |
} else if (System.nanoTime() - timeNanos.get() >= timeUnit.toNanos(timeSpan)) { | |
tryToFlushBuffer(true); | |
resetTimeWindow(); | |
} | |
} | |
private void tryToFlushBuffer(boolean query) { | |
if (subscriptionCounter.getCounter().get() > 0) { | |
subscriptionCounter.getCounter().decrementAndGet(); // one sent batch decrement by requested count by 1 | |
downstream.onNext(new ArrayList<>(buffer)); | |
buffer.clear(); | |
if (query) { | |
subscription.request(bufferSize); | |
} | |
} | |
} | |
@Override | |
public synchronized void onError(Throwable t) { | |
timeoutTask.ifPresent(Disposable::dispose); | |
this.downstream.onError(t); | |
} | |
@Override | |
public synchronized void onComplete() { | |
timeoutTask.ifPresent(Disposable::dispose); | |
tryToFlushBuffer(false); | |
this.downstream.onComplete(); | |
} | |
private void resetTimeWindow() { | |
timeNanos.set(System.nanoTime()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment