Last active
April 8, 2022 18:27
-
-
Save driventokill/c49f86fb0cc182994ef423a70e793a2d to your computer and use it in GitHub Desktop.
Buffer elements by time period or count in RxJava with backpressure supported
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Flowable; | |
import io.reactivex.FlowableTransformer; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.schedulers.Schedulers; | |
import org.reactivestreams.Publisher; | |
import org.reactivestreams.Subscriber; | |
import org.reactivestreams.Subscription; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.atomic.AtomicReference; | |
/** | |
* Buffer elements by time period or count with backpressure supported | |
* | |
* @author snoop.fy at gmail.com | |
*/ | |
public class BufferTransformer<T> implements FlowableTransformer<T, List<T>> { | |
private final Integer timespan; | |
private final Integer count; | |
private final TimeUnit timeUnit; | |
public BufferTransformer(int timespan, TimeUnit timeUnit, int count) { | |
this.timespan = timespan; | |
this.timeUnit = timeUnit; | |
this.count = count; | |
} | |
@Override | |
public Publisher<List<T>> apply(Flowable<T> upstream) { | |
return s -> upstream.subscribe(new BufferSubscriber<>(s)); | |
} | |
class BufferSubscriber<T> implements Subscriber<T> { | |
private final AtomicInteger wip = new AtomicInteger(0); | |
private final Subscriber<? super List<T>> actual; | |
private volatile Subscription subscription; | |
private final AtomicLong timeNanos = new AtomicLong(); | |
private final AtomicReference<List<T>> buffer = new AtomicReference<>(); | |
private final AtomicReference<Disposable> timeoutWork = new AtomicReference<>(); | |
BufferSubscriber(Subscriber<? super List<T>> actual) { | |
this.actual = actual; | |
} | |
@Override | |
public void onSubscribe(Subscription s) { | |
this.subscription = s; | |
resetWindow(); | |
s.request(count); | |
wip.addAndGet(count); | |
} | |
@Override | |
public void onNext(T t) { | |
if (wip.decrementAndGet() == 0) { | |
this.subscription.request(count); | |
wip.addAndGet(count); | |
} | |
buffer.get().add(t); | |
synchronized (actual) { | |
if (buffer.get().size() == count) { | |
actual.onNext(buffer.get()); | |
resetWindow(); | |
} else if (System.nanoTime() - timeNanos.get() >= timeUnit.toNanos(timespan)) { | |
actual.onNext(buffer.get()); | |
resetWindow(); | |
} | |
} | |
} | |
@Override | |
public void onError(Throwable t) { | |
synchronized (this.actual) { | |
this.actual.onError(t); | |
} | |
} | |
@Override | |
public void onComplete() { | |
synchronized (this.actual) { | |
this.actual.onComplete(); | |
} | |
} | |
private void resetWindow() { | |
buffer.set(new ArrayList<>()); | |
timeNanos.set(System.nanoTime()); | |
timeoutWork.getAndUpdate(s -> { | |
if (s != null && !s.isDisposed()) { | |
s.dispose(); | |
} | |
return Schedulers.computation().scheduleDirect(() -> { | |
synchronized (actual) { | |
actual.onNext(buffer.get()); | |
resetWindow(); | |
} | |
}, timespan, timeUnit); | |
}); | |
} | |
} | |
public static void main(String[] args) throws IOException { | |
Flowable.interval(1, TimeUnit.MILLISECONDS) | |
.compose(new BufferTransformer<Long>(1, TimeUnit.MILLISECONDS, 8)) | |
.subscribe(System.out::println); | |
System.in.read(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment