Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:14
Show Gist options
  • Save smaldini/f0ccba917954d8862526 to your computer and use it in GitHub Desktop.
Save smaldini/f0ccba917954d8862526 to your computer and use it in GitHub Desktop.
Reactor, Reactive Streams and Netty
def "step-read and flush every 5 elems with manual decoding"() {
given: "a TcpServer and a TcpClient"
def latch = new CountDownLatch(10)
def server = NetStreams.tcpServer(port)
def client = NetStreams.tcpClient("localhost", port)
def codec = new JsonCodec<Pojo, Pojo>(Pojo)
when: "the client/server are prepared"
server.pipeline { input ->
//bounded 5 in-flight data and flush every 5 elements (capacity(5l)
//the following pipeline decode json, log, re-encode and echo any passed data
input
.decode(codec)
.log('serve')
.map(codec)
.capacity(5l)
}
client.pipeline { input ->
//unbounded client receiver
input
.decode(codec)
.log('receive')
.consume { latch.countDown() }
//flush every 10 elements with capacity(10l)
Streams.range(1, 10)
.map { new Pojo(name: 'test' + it) }
.log('send')
.map(codec)
.capacity(10l)
}
then: "the client/server were started"
server?.start()?.flatMap { client.open() }?.awaitSuccess(5, TimeUnit.SECONDS)
latch.await(10, TimeUnit.SECONDS)
cleanup: "the client/server where stopped"
client?.close()?.flatMap { server.shutdown() }?.awaitSuccess(5, TimeUnit.SECONDS)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment