Skip to content

Instantly share code, notes, and snippets.

@smaldini
Created March 27, 2015 13:18
Show Gist options
  • Save smaldini/4c85515c210503b2092e to your computer and use it in GitHub Desktop.
Save smaldini/4c85515c210503b2092e to your computer and use it in GitHub Desktop.
@Test
public void test111() throws Exception {
Processor<Buffer, Buffer> processor = RingBufferProcessor.create(false);
Stream<Buffer> bufferStream = Streams
.wrap(processor)
.window(100, 1, TimeUnit.SECONDS)
.flatMap(s -> s.reduce(new Buffer(), (prev, next) -> {
return prev.append(next);
}))
.process(RingBufferWorkProcessor.create(false));
final GPFDistCodec codec = new GPFDistCodec();
reactor.io.net.http.HttpServer<Buffer, Buffer> httpServer = NetStreams.httpServer(server -> server
.codec(codec).listen(8080).dispatcher(Environment.sharedDispatcher()));
httpServer.get("/data", (request) -> {
log.info("XXX incoming request " + request);
request.responseHeaders().removeTransferEncodingChunked();
request.addResponseHeader("Content-type", "text/plain");
request.addResponseHeader("Expires", "0");
request.addResponseHeader("X-GPFDIST-VERSION", "Spring XD");
request.addResponseHeader("X-GP-PROTO", "1");
request.addResponseHeader("Cache-Control", "no-cache");
request.addResponseHeader("Connection", "close");
return bufferStream
.take(5, TimeUnit.SECONDS)
.concatWith(Streams.just(new Buffer().append(new byte[0])));
});
httpServer.start().awaitSuccess();
System.out.println("XXXX broadcast start");
int x = 0;
for (int j = 0; j < 100; j++) {
System.out.println("XXXX broadcast batch " + j);
Thread.sleep(100);
for (int i = 0; i < 10000; i++) {
int xx = x++;
processor.onNext(new Buffer().append(xx + ",person" + xx + "\n").flip());
}
}
System.out.println("XXXX broadcast end");
Thread.sleep(60000);
}
public class GPFDistCodec extends Codec<Buffer, Buffer, Buffer> {
final byte[] h1 = Character.toString('D').getBytes();
@SuppressWarnings("resource")
@Override
public Buffer apply(Buffer t) {
byte[] h2 = ByteBuffer.allocate(4).putInt(t.flip().remaining()).array();
return new Buffer().append(h1).append(h2).append(t).flip();
}
@Override
public Function<Buffer, Buffer> decoder(Consumer<Buffer> next) {
return null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment