Skip to content

Instantly share code, notes, and snippets.

@vy
Last active April 16, 2019 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vy/627f99c7bd15a6fb0758f2d6b3b36c62 to your computer and use it in GitHub Desktop.
Save vy/627f99c7bd15a6fb0758f2d6b3b36c62 to your computer and use it in GitHub Desktop.
Reactor backpressure-aware pull-push streaming example
package com.experiment.pubsub.pubsubdriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public enum PullPushAwareStreaming {;
private static final Logger LOGGER = LoggerFactory.getLogger(PullPushAwareStreaming.class);
private static final Random RANDOM = new Random(0);
private static final AtomicInteger PENDING_ITEM_COUNTER = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Scheduler scheduler = Schedulers.newParallel("main", 1);
CountDownLatch latch = new CountDownLatch(1);
nonEmptyPull(scheduler)
.flatMapIterable(items -> items)
.doOnNext(ignored -> PENDING_ITEM_COUNTER.incrementAndGet())
.concatMap(item -> consume(scheduler, item))
.subscribeOn(scheduler)
.doOnTerminate(latch::countDown)
.subscribe(
item -> LOGGER.info("completed item: {}", item),
error -> LOGGER.error("failure", error),
() -> LOGGER.info("completed"));
LOGGER.info("waiting");
latch.await();
scheduler.dispose();
}
private static Flux<List<Integer>> nonEmptyPull(Scheduler scheduler) {
return Flux
.<Mono<List<Integer>>>create(sink -> sink.onRequest(size -> {
Mono<List<Integer>> output = pull(size)
.filter(items -> !items.isEmpty())
.switchIfEmpty(Mono
.just(Collections.<Integer>emptyList())
.delayElement(Duration.ofSeconds(1), scheduler)
.doOnSubscribe(ignored -> LOGGER.info("delaying")));
sink.next(output);
}))
.concatMap(monos -> monos, 1)
.filter(items -> !items.isEmpty())
.doOnNext(items -> LOGGER.info("got items: {}", items))
.take(40);
}
private static Mono<List<Integer>> pull(long size) {
boolean empty = RANDOM.nextBoolean();
if (empty) {
LOGGER.info("returning empty");
return Mono.empty();
}
long effectiveSize = Math.min(size, 10);
List<Integer> items = LongStream
.range(0, effectiveSize)
.mapToObj(ignored -> RANDOM.nextInt(100))
.collect(Collectors.toList());
LOGGER.info("size: {}, effective size: {}, items: {}", size, effectiveSize, items);
return Mono.just(items);
}
private static Mono<Void> consume(Scheduler scheduler, int item) {
return Mono
.delay(Duration.ofMillis(2_000), scheduler)
.then()
.doOnSubscribe(ignored -> LOGGER.info("consuming item: {}", item))
.doOnTerminate(() -> {
int pendingItemCount = PENDING_ITEM_COUNTER.decrementAndGet();
LOGGER.warn("consumed item: {}, pendingItemCount: {}", item, pendingItemCount);
});
}
}
16:32:44.107 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:32:44.239 [main] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - waiting
16:32:44.260 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:44.294 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:44.296 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:45.309 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [47]
16:32:45.314 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:46.315 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:46.317 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [47]
16:32:46.323 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 47
16:32:46.323 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [91]
16:32:46.324 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:47.324 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [19]
16:32:47.325 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [91]
16:32:47.325 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:47.326 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [19]
16:32:47.326 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [77]
16:32:47.326 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:48.324 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 47, pendingItemCount: 2
16:32:48.324 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 91
16:32:48.327 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [62]
16:32:48.327 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [77]
16:32:48.327 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:48.328 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [62]
16:32:48.328 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:48.328 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:49.329 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:49.329 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:50.325 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 91, pendingItemCount: 3
16:32:50.325 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 19
16:32:50.330 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [41]
16:32:50.330 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:51.331 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [43]
16:32:51.331 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [41]
16:32:51.332 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [24]
16:32:51.332 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [43]
16:32:51.333 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [52]
16:32:51.333 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [24]
16:32:51.333 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:51.333 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [52]
16:32:51.334 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [82]
16:32:51.334 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:52.326 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 19, pendingItemCount: 6
16:32:52.326 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 77
16:32:52.335 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:52.335 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [82]
16:32:52.335 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [45]
16:32:52.336 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:53.336 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:53.336 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [45]
16:32:53.336 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [87]
16:32:53.337 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:54.327 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 77, pendingItemCount: 7
16:32:54.327 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 62
16:32:54.337 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:54.337 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [87]
16:32:54.338 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [25]
16:32:54.338 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:55.339 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [38]
16:32:55.339 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [25]
16:32:55.339 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:55.340 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [38]
16:32:55.340 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:55.340 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:56.327 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 62, pendingItemCount: 9
16:32:56.328 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 41
16:32:56.340 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:56.341 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:57.341 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [30]
16:32:57.342 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:58.328 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 41, pendingItemCount: 8
16:32:58.329 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 43
16:32:58.342 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:58.343 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [30]
16:32:58.343 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:58.343 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:32:59.344 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:32:59.344 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:00.329 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 43, pendingItemCount: 8
16:33:00.330 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 24
16:33:00.345 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:00.345 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:01.346 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:01.347 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:02.330 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 24, pendingItemCount: 7
16:33:02.331 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 52
16:33:02.347 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [19]
16:33:02.348 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:03.348 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [16]
16:33:03.348 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [19]
16:33:03.348 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:03.349 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [16]
16:33:03.349 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:03.349 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:04.331 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 52, pendingItemCount: 8
16:33:04.332 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 82
16:33:04.349 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:04.350 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:05.350 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [37]
16:33:05.351 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:06.332 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 82, pendingItemCount: 7
16:33:06.332 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 45
16:33:06.351 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [47]
16:33:06.352 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [37]
16:33:06.352 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:06.352 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [47]
16:33:06.352 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [5]
16:33:06.353 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:07.353 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:07.354 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [5]
16:33:07.354 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:07.354 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:08.333 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 45, pendingItemCount: 9
16:33:08.333 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 87
16:33:08.355 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [51]
16:33:08.355 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:09.356 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [58]
16:33:09.356 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [51]
16:33:09.356 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [38]
16:33:09.356 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [58]
16:33:09.356 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:09.357 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [38]
16:33:09.357 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [51]
16:33:09.357 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:10.334 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 87, pendingItemCount: 11
16:33:10.334 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 25
16:33:10.358 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [80]
16:33:10.358 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [51]
16:33:10.359 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [62]
16:33:10.359 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [80]
16:33:10.359 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:10.360 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [62]
16:33:10.360 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:10.360 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:11.361 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:11.361 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:12.335 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 25, pendingItemCount: 13
16:33:12.335 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 38
16:33:12.361 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:12.362 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:13.362 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:13.363 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:14.336 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 38, pendingItemCount: 12
16:33:14.336 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 30
16:33:14.363 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:14.363 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:15.364 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:15.364 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:16.337 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 30, pendingItemCount: 11
16:33:16.337 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 19
16:33:16.365 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [89]
16:33:16.365 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:17.365 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [1]
16:33:17.366 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [89]
16:33:17.366 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [23]
16:33:17.366 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [1]
16:33:17.367 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [68]
16:33:17.367 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [23]
16:33:17.367 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [16]
16:33:17.368 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [68]
16:33:17.368 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [57]
16:33:17.368 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [16]
16:33:17.369 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:17.369 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [57]
16:33:17.369 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:17.370 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:18.337 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 19, pendingItemCount: 16
16:33:18.337 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 16
16:33:18.370 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:18.371 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:19.371 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [65]
16:33:19.372 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:20.338 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 16, pendingItemCount: 15
16:33:20.338 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 37
16:33:20.373 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:20.373 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [65]
16:33:20.373 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [67]
16:33:20.374 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:21.374 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:21.375 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [67]
16:33:21.375 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [97]
16:33:21.375 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:22.339 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 37, pendingItemCount: 14
16:33:22.339 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 47
16:33:22.376 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [63]
16:33:22.376 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [97]
16:33:22.376 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:22.377 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [63]
16:33:22.377 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [98]
16:33:22.377 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:23.378 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:23.378 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [98]
16:33:23.378 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - returning empty
16:33:23.378 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:24.340 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 47, pendingItemCount: 13
16:33:24.340 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 5
16:33:24.379 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [86]
16:33:24.379 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - delaying
16:33:25.380 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [90]
16:33:25.380 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [86]
16:33:25.381 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [71]
16:33:25.381 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [90]
16:33:25.381 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - size: 1, effective size: 1, items: [30]
16:33:25.381 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - got items: [71]
16:33:26.341 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 5, pendingItemCount: 12
16:33:26.341 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 51
16:33:28.341 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 51, pendingItemCount: 11
16:33:28.342 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 58
16:33:30.342 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 58, pendingItemCount: 10
16:33:30.343 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 38
16:33:32.343 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 38, pendingItemCount: 9
16:33:32.344 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 51
16:33:34.344 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 51, pendingItemCount: 16
16:33:34.345 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 80
16:33:36.345 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 80, pendingItemCount: 15
16:33:36.345 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 62
16:33:38.345 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 62, pendingItemCount: 14
16:33:38.346 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 89
16:33:40.346 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 89, pendingItemCount: 13
16:33:40.346 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 1
16:33:42.347 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 1, pendingItemCount: 12
16:33:42.347 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 23
16:33:44.348 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 23, pendingItemCount: 11
16:33:44.348 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 68
16:33:46.348 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 68, pendingItemCount: 10
16:33:46.349 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 16
16:33:48.349 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 16, pendingItemCount: 9
16:33:48.350 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 57
16:33:50.350 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 57, pendingItemCount: 8
16:33:50.351 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 65
16:33:52.351 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 65, pendingItemCount: 7
16:33:52.351 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 67
16:33:54.352 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 67, pendingItemCount: 6
16:33:54.352 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 97
16:33:56.353 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 97, pendingItemCount: 5
16:33:56.353 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 63
16:33:58.353 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 63, pendingItemCount: 4
16:33:58.353 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 98
16:34:00.354 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 98, pendingItemCount: 3
16:34:00.354 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 86
16:34:02.355 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 86, pendingItemCount: 2
16:34:02.355 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 90
16:34:04.355 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 90, pendingItemCount: 1
16:34:04.356 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consuming item: 71
16:34:06.356 [main-1] WARN com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - consumed item: 71, pendingItemCount: 0
16:34:06.357 [main-1] INFO com.experiment.pubsub.pubsubdriver.PullPushAwareStreaming - completed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment