Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active December 29, 2015 01:09
Show Gist options
  • Save smaldini/7590867 to your computer and use it in GitHub Desktop.
Save smaldini/7590867 to your computer and use it in GitHub Desktop.
Deferred head = Streams.defer().
env(env).
batchSize(333).
dispatcher(Environment.RING_BUFFER).
get()
Stream tail = head.compose().collect()
tail.consume(consumer { List<Integer> ints ->
println ints.size()
println Thread.currentThread()
sum.addAndGet(ints.size())
latch.countDown()
})
println stream.debug()
"""reactor(1535b5f1-52fa-11e3-db66-bd6f11826f68)
| |____accept:CollectOperation accepted:0|errors:0|batchSize:333
| | |____accept:CallbackOperation"""
//Reuse dispatcher/reactor from initial stream and fixes concurrency issues
"""333
Thread[ringBuffer-ringbuffer-2,5,main]
333
Thread[ringBuffer-ringbuffer-2,5,main]
333
Thread[ringBuffer-ringbuffer-2,5,main]
"""
//another example, accept and flush are the selector keys used to jump to a downstream component (errors hops are hidden)
"""
reactor(347d7401-52fc-11e3-9af9-ddfe37167d5f)
| |____accept:MapOperation
| | |____accept:MapOperation
| | | |____accept:CallbackOperation
| |____flush:ForEachFlushOperation
| | |____accept:MapOperation
| | | |____accept:MapOperation
| | | | |____accept:CallbackOperation"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment