Skip to content

Instantly share code, notes, and snippets.

@driventokill
Last active April 8, 2022 18:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save driventokill/c49f86fb0cc182994ef423a70e793a2d to your computer and use it in GitHub Desktop.
Save driventokill/c49f86fb0cc182994ef423a70e793a2d to your computer and use it in GitHub Desktop.
Buffer elements by time period or count in RxJava with backpressure supported
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Buffer elements by time period or count with backpressure supported
*
* @author snoop.fy at gmail.com
*/
public class BufferTransformer<T> implements FlowableTransformer<T, List<T>> {
private final Integer timespan;
private final Integer count;
private final TimeUnit timeUnit;
public BufferTransformer(int timespan, TimeUnit timeUnit, int count) {
this.timespan = timespan;
this.timeUnit = timeUnit;
this.count = count;
}
@Override
public Publisher<List<T>> apply(Flowable<T> upstream) {
return s -> upstream.subscribe(new BufferSubscriber<>(s));
}
class BufferSubscriber<T> implements Subscriber<T> {
private final AtomicInteger wip = new AtomicInteger(0);
private final Subscriber<? super List<T>> actual;
private volatile Subscription subscription;
private final AtomicLong timeNanos = new AtomicLong();
private final AtomicReference<List<T>> buffer = new AtomicReference<>();
private final AtomicReference<Disposable> timeoutWork = new AtomicReference<>();
BufferSubscriber(Subscriber<? super List<T>> actual) {
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
resetWindow();
s.request(count);
wip.addAndGet(count);
}
@Override
public void onNext(T t) {
if (wip.decrementAndGet() == 0) {
this.subscription.request(count);
wip.addAndGet(count);
}
buffer.get().add(t);
synchronized (actual) {
if (buffer.get().size() == count) {
actual.onNext(buffer.get());
resetWindow();
} else if (System.nanoTime() - timeNanos.get() >= timeUnit.toNanos(timespan)) {
actual.onNext(buffer.get());
resetWindow();
}
}
}
@Override
public void onError(Throwable t) {
synchronized (this.actual) {
this.actual.onError(t);
}
}
@Override
public void onComplete() {
synchronized (this.actual) {
this.actual.onComplete();
}
}
private void resetWindow() {
buffer.set(new ArrayList<>());
timeNanos.set(System.nanoTime());
timeoutWork.getAndUpdate(s -> {
if (s != null && !s.isDisposed()) {
s.dispose();
}
return Schedulers.computation().scheduleDirect(() -> {
synchronized (actual) {
actual.onNext(buffer.get());
resetWindow();
}
}, timespan, timeUnit);
});
}
}
public static void main(String[] args) throws IOException {
Flowable.interval(1, TimeUnit.MILLISECONDS)
.compose(new BufferTransformer<Long>(1, TimeUnit.MILLISECONDS, 8))
.subscribe(System.out::println);
System.in.read();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment