Skip to content

Instantly share code, notes, and snippets.

@mwmitchell
Created August 25, 2020 18:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwmitchell/f3969b9f4fc19ba721303b3562b80142 to your computer and use it in GitHub Desktop.
Save mwmitchell/f3969b9f4fc19ba721303b3562b80142 to your computer and use it in GitHub Desktop.
package com.test;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.nio.charset.Charset;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ReactiveGroupsDemo {
public static void main(String[] args) throws InterruptedException {
// Create buckets of threads, where each thread is consistently mapped to a range of items and their results...
// Number of "buckets" (threads)
int parallelism = 4;
// Function for hashing our requestIds
HashFunction hash = Hashing.goodFastHash(32);
// Incoming messages... each fetch call produces n response messages, grouped by the original requestId.
Flux.just("item-a", "item-b", "item-c", "item-d", "item-e", "item-d", "item-c", "item-b", "item-a")
// The requestId is hashed and assigned to a "group"
.groupBy(i -> Hashing.consistentHash(hash.hashString(i + "", Charset.defaultCharset()), parallelism))
.flatMap(grp -> grp
// Asynchronously process items in this group, but use only 1 thread order is maintained:
.parallel(1)
.runOn(Schedulers.newSingle("processor-" + grp.key()))
// map the data for demo purposes... group (0-4) + current value
.map(i -> grp.key() + " | " + i))
.subscribe(val -> {
System.out.println("-> " + val);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment