Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Created April 29, 2014 04:19
Show Gist options
  • Save NiteshKant/11390566 to your computer and use it in GitHub Desktop.
Save NiteshKant/11390566 to your computer and use it in GitHub Desktop.
A URI based router for RxNetty, How Awesome :)
package io.reactivex.netty.examples.java;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaders;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import io.reactivex.netty.serialization.ContentTransformer;
import rx.Observable;
import rx.functions.Func1;
import java.util.concurrent.TimeUnit;
/**
* @author Nitesh Kant
*/
public final class HttpAwesomeServer {
public static void main(String[] args) {
RxNetty.createHttpServer(8080, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
final HttpServerResponse<ByteBuf> response) {
String uri = request.getUri();
if ("/sse".equals(uri)) {
response.getHeaders().add(HttpHeaders.Names.CONTENT_TYPE, "text/eventstream");
return Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<Void>>() {
@Override
public Observable<Void> call(Long aLong) {
return response.writeAndFlush(new ServerSentEvent("blah", "data", "Hey Buddy " + aLong),
new SSETransformer());
}
});
} else {
response.getHeaders().add(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
return response.writeStringAndFlush("Hello! You don't need SSE, do you?");
}
}
}).startAndWait();
}
private static class SSETransformer implements ContentTransformer<ServerSentEvent> {
@Override
public ByteBuf transform(ServerSentEvent toTransform, ByteBufAllocator byteBufAllocator) {
// NOTE TO MYSELF: I SHOULD MOVE THIS TO ServerSentEvent AS toByteBuf()
StringBuilder eventBuilder = new StringBuilder();
eventBuilder.append(toTransform.getEventName());
eventBuilder.append(": ");
eventBuilder.append(toTransform.getEventData());
eventBuilder.append("\n\n");
String data = eventBuilder.toString();
return byteBufAllocator.buffer(data.length())
.writeBytes(data.getBytes());
}
}
}
@NiteshKant
Copy link
Author

Test:

➜  rxnetty-nkant git:(contexts) ✗ curl "http://localhost:8080/sse"
data: Hey Buddy 0

data: Hey Buddy 1

data: Hey Buddy 2

data: Hey Buddy 3

^C
➜  rxnetty-nkant git:(contexts) ✗ curl "http://localhost:8080"    
Hello! You don't need SSE, do you?%      

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment