Skip to content

Instantly share code, notes, and snippets.

@sskrla
Created October 23, 2016 00:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sskrla/f70bdb6ad0248dde2d47fca061a991a5 to your computer and use it in GitHub Desktop.
Save sskrla/f70bdb6ad0248dde2d47fca061a991a5 to your computer and use it in GitHub Desktop.
public class Publishers {
/**
* Orders the incoming flux by a sequence. An incoming element will only be emitted its sequence values is exactly
* greater than the previous.
*
* @param getSequenceNumber
* @param <T>
* @return
*/
public static <T> Function<Flux<T>, Flux<T>> sequential(Function<T, Long> getSequenceNumber) {
return flux -> {
AtomicLong seq = new AtomicLong(0);
return flux
.scan(
new TreeSet<Pair<Long, T>>((l, r) -> l.getKey().compareTo(r.getKey())),
(queue, next) -> {
long seqVal = getSequenceNumber.apply(next);
queue.add(Pair.of(seqVal, next));
return queue;
})
.concatMap(queue -> {
Iterator<Pair<Long, T>> it = queue.iterator();
List<T> elements = new ArrayList<>();
while(it.hasNext()) {
Pair<Long, T> next = it.next();
if(next.getKey() == seq.get()) {
it.remove();
seq.incrementAndGet();
elements.add(next.getValue());
}
}
return Flux.fromIterable(elements);
});
};
}
}
class SequenceTest extends Specification {
def "sequential orders correctly"() {
given:
def rando = new Random(1024);
def sleeps = (0..100).collect { (long) (rando.nextFloat() * 100) }
def i = new AtomicInteger(0)
when:
def values = Flux.range(0, 100)
.parallel(10)
.runOn(Schedulers.elastic())
.log("submitted")
.doOnNext({
Thread.sleep(sleeps[i.getAndIncrement()])
})
.sequential()
.log("unordered")
.compose(Publishers.sequential { it.longValue() })
.log("ordered")
.collectList()
.subscribeOn(Schedulers.elastic())
.block()
then:
values.size() == 100
values.inject(-1) { last, next ->
last == next - 1 ? next : -1;
} != -1
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment