Skip to content

Instantly share code, notes, and snippets.

@goeh
Created May 25, 2015 12:10
Show Gist options
  • Save goeh/b33363550de083ef541f to your computer and use it in GitHub Desktop.
Save goeh/b33363550de083ef541f to your computer and use it in GitHub Desktop.
Allow you to send an event to multiple subscribers, then collect multiple responses
import groovy.transform.CompileStatic
import org.springframework.beans.factory.annotation.Autowired
import reactor.Environment
import reactor.bus.Event
import reactor.bus.EventBus
import reactor.bus.registry.Registration
import reactor.bus.registry.Registry
import reactor.bus.selector.Selector
import reactor.bus.selector.Selectors
import reactor.rx.Promise
import reactor.rx.Promises
import java.util.concurrent.Callable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
/**
* A trait that adds sendAndCollect support to Grails 3.0.1 controllers.
*/
@CompileStatic
trait ExtendedEvents<T> {
@Autowired
EventBus eventBus
@Autowired
Environment reactorEnv
private ForkJoinPool forkJoinPool = new ForkJoinPool(2)
public Promise<Collection<T>> sendAndCollect(Object key, data) {
sendAndCollect(key, data, 10L, TimeUnit.SECONDS)
}
public Promise<Collection<T>> sendAndCollect(Object key, Object data, long timeout, TimeUnit timeUnit) {
final List<T> list = Collections.synchronizedList([])
final Registry consumerRegistry = eventBus.getConsumerRegistry()
final int registrations = consumerRegistry.select(key).size()
final CountDownLatch latch = new CountDownLatch(registrations)
final Selector sel = Selectors.anonymous()
final Registration registration = eventBus.on(sel, { Event<T> ev ->
list.add(ev.getData())
latch.countDown()
});
final Promise<Collection<T>> p = Promises.prepare()
forkJoinPool.submit({
latch.await(timeout, timeUnit)
p.accept(list)
registration.cancel()
} as Callable)
eventBus.send key, Event.wrap(data, sel.getObject())
p
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment