Created
October 23, 2016 00:12
-
-
Save sskrla/f70bdb6ad0248dde2d47fca061a991a5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Publishers { | |
/** | |
* Orders the incoming flux by a sequence. An incoming element will only be emitted its sequence values is exactly | |
* greater than the previous. | |
* | |
* @param getSequenceNumber | |
* @param <T> | |
* @return | |
*/ | |
public static <T> Function<Flux<T>, Flux<T>> sequential(Function<T, Long> getSequenceNumber) { | |
return flux -> { | |
AtomicLong seq = new AtomicLong(0); | |
return flux | |
.scan( | |
new TreeSet<Pair<Long, T>>((l, r) -> l.getKey().compareTo(r.getKey())), | |
(queue, next) -> { | |
long seqVal = getSequenceNumber.apply(next); | |
queue.add(Pair.of(seqVal, next)); | |
return queue; | |
}) | |
.concatMap(queue -> { | |
Iterator<Pair<Long, T>> it = queue.iterator(); | |
List<T> elements = new ArrayList<>(); | |
while(it.hasNext()) { | |
Pair<Long, T> next = it.next(); | |
if(next.getKey() == seq.get()) { | |
it.remove(); | |
seq.incrementAndGet(); | |
elements.add(next.getValue()); | |
} | |
} | |
return Flux.fromIterable(elements); | |
}); | |
}; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SequenceTest extends Specification { | |
def "sequential orders correctly"() { | |
given: | |
def rando = new Random(1024); | |
def sleeps = (0..100).collect { (long) (rando.nextFloat() * 100) } | |
def i = new AtomicInteger(0) | |
when: | |
def values = Flux.range(0, 100) | |
.parallel(10) | |
.runOn(Schedulers.elastic()) | |
.log("submitted") | |
.doOnNext({ | |
Thread.sleep(sleeps[i.getAndIncrement()]) | |
}) | |
.sequential() | |
.log("unordered") | |
.compose(Publishers.sequential { it.longValue() }) | |
.log("ordered") | |
.collectList() | |
.subscribeOn(Schedulers.elastic()) | |
.block() | |
then: | |
values.size() == 100 | |
values.inject(-1) { last, next -> | |
last == next - 1 ? next : -1; | |
} != -1 | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment