Skip to content

Instantly share code, notes, and snippets.

@artur-jablonski
Created January 3, 2019 16:42
Show Gist options
  • Save artur-jablonski/5eb2bb470868d9eeeb3c9ee247110d4a to your computer and use it in GitHub Desktop.
Save artur-jablonski/5eb2bb470868d9eeeb3c9ee247110d4a to your computer and use it in GitHub Desktop.
backpressured buffer with timeout
/**
* backpressured buffer with timeout, alternative implementation
* to get the right behaviour
* 1. Fully Backpressured with no unbounded consumption of streams
* 2. First element in a buffer will kick off a timer, if it sets off
* before the buffer reaches it's max size then the buffer will be released
* and new buffer will be started (with new timer)
* 3. Buffer is released if it reaches max size
*
* @return
*/
public static <T> FlowableTransformer<T, List<T>> buffer2(
int n, long period, TimeUnit unit)
{
AsyncAPI<C<T>> asyncAPI = new AsyncAPI<>();
return
f ->
f.map(C::of)
.doOnComplete(asyncAPI::complete)
.mergeWith(generator(asyncAPI)
.switchMap(
c -> Flowable.timer(period, unit)
.map(__ -> c)
)
)
.compose(
//todo do I need the Atomics here, can I get away with regular Integer?
//I could, but I would need to store the context every time.
FlowableTransformers.partialCollect(
(Consumer<PartialCollectEmitter<C<T>, AbstractMap.SimpleEntry<AtomicInteger, C<T>>, List<C<T>>, List<C<T>>>>) emitter -> {
AbstractMap.SimpleEntry<AtomicInteger, C<T>> ctx =
emitter.getIndex();
if (ctx == null) {
ctx = new AbstractMap.SimpleEntry<>(
new AtomicInteger(), null);
emitter.setIndex(ctx);
}
AtomicInteger counter = ctx.getKey();
List<C<T>> buf = emitter.getAccumulator();
if (buf == null) {
buf = new ArrayList<>();
emitter.setAccumulator(buf);
}
if (emitter.demand() != 0) {
boolean d = emitter.isComplete();
if (emitter.size() != 0) {
C<T> item = emitter.getItem(0);
if (item.get() == null) {
logger
.info(
"Time out signal captured {}",
item);
if (item != ctx.getValue()) {
logger.info(
"But it's the old one! Too late bro!");
emitter.dropItems(1);
return;
} else if (counter.get() == 0) {
logger.info(
"Late, i've just released full buffer!");
emitter.dropItems(1);
return;
} else {
emitter.dropItems(1);
emitter.next(buf);
emitter
.setAccumulator(
new ArrayList<>());
counter.set(0);
return;
}
}
buf.add(item);
emitter.dropItems(1);
int modulo = counter
.getAndUpdate(v -> (v + 1) % n);
if (modulo == 0) {
ctx.setValue(C.of(null));
asyncAPI
.kickoffTimer(ctx.getValue());
}
if (modulo == n - 1) {
logger.info("buffer reached max size");
emitter.next(buf);
emitter
.setAccumulator(new ArrayList<>());
}
} else if (d) {
if (buf.size() != 0)
emitter.next(buf);
emitter.complete();
}
}
}, Functions.emptyConsumer(), 128))
.concatMap(
l -> Flowable.fromIterable(l)
.doOnNext(c -> {
if (c.get() == null)
logger.error(
"ALERT! ALERT! Null crocodiles ahead!");
})
.map(C::get)
.toList()
.toFlowable()
);
}
private static class AsyncAPI<T>
{
private CompletableFuture<Void> future = new CompletableFuture<>();
private AtomicReference<CompletableFuture<T>> value =
new AtomicReference<>();
{
value.set(new CompletableFuture<>());
}
public CompletableFuture<Void> nextValue(Consumer<? super T> onValue)
{
value.get()
.whenComplete(
(t, throwable) -> {
if (throwable != null)
future.completeExceptionally(throwable);
else
try {
onValue.accept(t);
} catch (Exception e) {
future.completeExceptionally(e);
}
}
);
return future;
}
public void kickoffTimer(T item)
{
value.getAndSet(new CompletableFuture<>()).complete(item);
}
public void complete()
{
logger.info("completed asyncApi");
future.complete(null);
}
}
private static class C<T>
{
T t;
private C(T t) {this.t = t;}
public static <T> C<T> of(T t)
{
return new C<>(t);
}
public T get() { return this.t; }
}
private static <T> Flowable<T> generator(AsyncAPI<T> api)
{
return
Flowables.generateAsync(
// create a fresh API instance for each individual Subscriber
() -> api,
// this BiFunction will be called once the operator is ready to receive the next item
// and will invoke it again only when that item is delivered via emitter.onNext()
(state, emitter) -> {
// issue the async API call
CompletableFuture<Void> f = state.nextValue(
// handle the value received
emitter::onNext
);
// This API call may not produce further items or fail
f.whenComplete((done, error) -> {
// As per the CompletableFuture API, error != null is the error outcome,
// done is always null due to the Void type
if (error != null) {
emitter.onError(error);
} else {
emitter.onComplete();
}
});
// In case the downstream cancels, the current API call
// should be cancelled as well
emitter.replaceCancellable(() -> f.cancel(true));
// some sources may want to create a fresh state object
// after each invocation of this generator
return state;
},
// cleanup the state object
state -> { }
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment