Skip to content

Instantly share code, notes, and snippets.

@danveloper
Created June 11, 2015 15:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danveloper/c096d1cb502ccc8b9d2d to your computer and use it in GitHub Desktop.
Save danveloper/c096d1cb502ccc8b9d2d to your computer and use it in GitHub Desktop.
Ratpack 0.9.17 WebSocket Streaming
import com.google.inject.Inject
import com.google.inject.Scopes
import org.reactivestreams.Publisher
import ratpack.exec.ExecController
import ratpack.form.Form
import ratpack.func.Function
import ratpack.groovy.Groovy
import ratpack.guice.Guice
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
import ratpack.server.Service
import ratpack.server.StartEvent
import ratpack.stream.Streams
import ratpack.websocket.WebSockets
import java.time.Duration
import static com.google.common.collect.Queues.newArrayDeque
import static groovy.json.JsonOutput.toJson
class MainGroovy {
static class StreamContainer implements Service {
Publisher<String> stream
private final Deque<String> queue = newArrayDeque()
private final ExecController execController
@Inject
StreamContainer(ExecController execController) {
this.execController = execController
}
void onStart(StartEvent event) {
this.stream = poll {
queue.pollLast() ?: []
}
}
void publish(String msg) {
queue.push(msg)
}
private Publisher<String> poll(Closure callback) {
Streams.periodically(execController.executor, Duration.ofMillis(100), callback as Function) map {
it ? toJson([message: it]) : ""
} multicast()
}
}
public static void main(String[] args) {
RatpackServer.start { spec -> spec
.serverConfig(ServerConfig.noBaseDir())
.registry(Guice.registry { b ->
b.binder { binder ->
binder.bind(StreamContainer).in(Scopes.SINGLETON)
}
})
.handlers(Groovy.chain {
get {
response.send("text/html", """
<!DOCTYPE HTML>
<html>
<body>
<script>
if (!window.WebSocket) {
alert("This won't work in your browser. Try Chrome or a gooder version of Safari.");
} else {
function connectWs() {
if (!window.ws || window.ws.readyState != WebSocket.OPEN) {
window.ws = new WebSocket("ws://"+location.host+"/stream");
window.ws.onopen = function(event) {
console.log("WebSocket opened!");
};
window.ws.onmessage = function(event) {
if (event.data) {
var div = document.createElement('div');
div.innerHTML = JSON.parse(event.data).message;
document.body.appendChild(div);
}
};
window.ws.onclose = function(event) {
var timer = setTimeout(function() {
console.log("Retrying connection...");
connectWs();
if (window.ws.readyState == WebSocket.OPEN) {
clearTimeout(timer);
}
}, 1000);
};
}
}
connectWs();
}
</script>
</body>
</html>
""")
}
handler("stream") {
byMethod {
get { ctx ->
def streamContainer = ctx.get(StreamContainer)
def stream = streamContainer.stream
WebSockets.websocketBroadcast(ctx, stream)
}
post { ctx ->
def form = ctx.parse(Form)
def msg = form.msg as String
def streamContainer = ctx.get(StreamContainer)
streamContainer.publish(msg)
ctx.response.status(202)
ctx.response.send()
}
}
}
})
}
}
}
@danveloper
Copy link
Author

Example curl call to post a message ==> curl -d "msg=bar" localhost:5050/stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment