Created
January 3, 2019 16:42
-
-
Save artur-jablonski/5eb2bb470868d9eeeb3c9ee247110d4a to your computer and use it in GitHub Desktop.
backpressured buffer with timeout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* backpressured buffer with timeout, alternative implementation | |
* to get the right behaviour | |
* 1. Fully Backpressured with no unbounded consumption of streams | |
* 2. First element in a buffer will kick off a timer, if it sets off | |
* before the buffer reaches it's max size then the buffer will be released | |
* and new buffer will be started (with new timer) | |
* 3. Buffer is released if it reaches max size | |
* | |
* @return | |
*/ | |
public static <T> FlowableTransformer<T, List<T>> buffer2( | |
int n, long period, TimeUnit unit) | |
{ | |
AsyncAPI<C<T>> asyncAPI = new AsyncAPI<>(); | |
return | |
f -> | |
f.map(C::of) | |
.doOnComplete(asyncAPI::complete) | |
.mergeWith(generator(asyncAPI) | |
.switchMap( | |
c -> Flowable.timer(period, unit) | |
.map(__ -> c) | |
) | |
) | |
.compose( | |
//todo do I need the Atomics here, can I get away with regular Integer? | |
//I could, but I would need to store the context every time. | |
FlowableTransformers.partialCollect( | |
(Consumer<PartialCollectEmitter<C<T>, AbstractMap.SimpleEntry<AtomicInteger, C<T>>, List<C<T>>, List<C<T>>>>) emitter -> { | |
AbstractMap.SimpleEntry<AtomicInteger, C<T>> ctx = | |
emitter.getIndex(); | |
if (ctx == null) { | |
ctx = new AbstractMap.SimpleEntry<>( | |
new AtomicInteger(), null); | |
emitter.setIndex(ctx); | |
} | |
AtomicInteger counter = ctx.getKey(); | |
List<C<T>> buf = emitter.getAccumulator(); | |
if (buf == null) { | |
buf = new ArrayList<>(); | |
emitter.setAccumulator(buf); | |
} | |
if (emitter.demand() != 0) { | |
boolean d = emitter.isComplete(); | |
if (emitter.size() != 0) { | |
C<T> item = emitter.getItem(0); | |
if (item.get() == null) { | |
logger | |
.info( | |
"Time out signal captured {}", | |
item); | |
if (item != ctx.getValue()) { | |
logger.info( | |
"But it's the old one! Too late bro!"); | |
emitter.dropItems(1); | |
return; | |
} else if (counter.get() == 0) { | |
logger.info( | |
"Late, i've just released full buffer!"); | |
emitter.dropItems(1); | |
return; | |
} else { | |
emitter.dropItems(1); | |
emitter.next(buf); | |
emitter | |
.setAccumulator( | |
new ArrayList<>()); | |
counter.set(0); | |
return; | |
} | |
} | |
buf.add(item); | |
emitter.dropItems(1); | |
int modulo = counter | |
.getAndUpdate(v -> (v + 1) % n); | |
if (modulo == 0) { | |
ctx.setValue(C.of(null)); | |
asyncAPI | |
.kickoffTimer(ctx.getValue()); | |
} | |
if (modulo == n - 1) { | |
logger.info("buffer reached max size"); | |
emitter.next(buf); | |
emitter | |
.setAccumulator(new ArrayList<>()); | |
} | |
} else if (d) { | |
if (buf.size() != 0) | |
emitter.next(buf); | |
emitter.complete(); | |
} | |
} | |
}, Functions.emptyConsumer(), 128)) | |
.concatMap( | |
l -> Flowable.fromIterable(l) | |
.doOnNext(c -> { | |
if (c.get() == null) | |
logger.error( | |
"ALERT! ALERT! Null crocodiles ahead!"); | |
}) | |
.map(C::get) | |
.toList() | |
.toFlowable() | |
); | |
} | |
private static class AsyncAPI<T> | |
{ | |
private CompletableFuture<Void> future = new CompletableFuture<>(); | |
private AtomicReference<CompletableFuture<T>> value = | |
new AtomicReference<>(); | |
{ | |
value.set(new CompletableFuture<>()); | |
} | |
public CompletableFuture<Void> nextValue(Consumer<? super T> onValue) | |
{ | |
value.get() | |
.whenComplete( | |
(t, throwable) -> { | |
if (throwable != null) | |
future.completeExceptionally(throwable); | |
else | |
try { | |
onValue.accept(t); | |
} catch (Exception e) { | |
future.completeExceptionally(e); | |
} | |
} | |
); | |
return future; | |
} | |
public void kickoffTimer(T item) | |
{ | |
value.getAndSet(new CompletableFuture<>()).complete(item); | |
} | |
public void complete() | |
{ | |
logger.info("completed asyncApi"); | |
future.complete(null); | |
} | |
} | |
private static class C<T> | |
{ | |
T t; | |
private C(T t) {this.t = t;} | |
public static <T> C<T> of(T t) | |
{ | |
return new C<>(t); | |
} | |
public T get() { return this.t; } | |
} | |
private static <T> Flowable<T> generator(AsyncAPI<T> api) | |
{ | |
return | |
Flowables.generateAsync( | |
// create a fresh API instance for each individual Subscriber | |
() -> api, | |
// this BiFunction will be called once the operator is ready to receive the next item | |
// and will invoke it again only when that item is delivered via emitter.onNext() | |
(state, emitter) -> { | |
// issue the async API call | |
CompletableFuture<Void> f = state.nextValue( | |
// handle the value received | |
emitter::onNext | |
); | |
// This API call may not produce further items or fail | |
f.whenComplete((done, error) -> { | |
// As per the CompletableFuture API, error != null is the error outcome, | |
// done is always null due to the Void type | |
if (error != null) { | |
emitter.onError(error); | |
} else { | |
emitter.onComplete(); | |
} | |
}); | |
// In case the downstream cancels, the current API call | |
// should be cancelled as well | |
emitter.replaceCancellable(() -> f.cancel(true)); | |
// some sources may want to create a fresh state object | |
// after each invocation of this generator | |
return state; | |
}, | |
// cleanup the state object | |
state -> { } | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment