Created
August 25, 2020 18:27
-
-
Save mwmitchell/f3969b9f4fc19ba721303b3562b80142 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.test; | |
import com.google.common.hash.HashFunction; | |
import com.google.common.hash.Hashing; | |
import java.nio.charset.Charset; | |
import reactor.core.publisher.Flux; | |
import reactor.core.scheduler.Schedulers; | |
public class ReactiveGroupsDemo { | |
public static void main(String[] args) throws InterruptedException { | |
// Create buckets of threads, where each thread is consistently mapped to a range of items and their results... | |
// Number of "buckets" (threads) | |
int parallelism = 4; | |
// Function for hashing our requestIds | |
HashFunction hash = Hashing.goodFastHash(32); | |
// Incoming messages... each fetch call produces n response messages, grouped by the original requestId. | |
Flux.just("item-a", "item-b", "item-c", "item-d", "item-e", "item-d", "item-c", "item-b", "item-a") | |
// The requestId is hashed and assigned to a "group" | |
.groupBy(i -> Hashing.consistentHash(hash.hashString(i + "", Charset.defaultCharset()), parallelism)) | |
.flatMap(grp -> grp | |
// Asynchronously process items in this group, but use only 1 thread order is maintained: | |
.parallel(1) | |
.runOn(Schedulers.newSingle("processor-" + grp.key())) | |
// map the data for demo purposes... group (0-4) + current value | |
.map(i -> grp.key() + " | " + i)) | |
.subscribe(val -> { | |
System.out.println("-> " + val); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment