Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Suuport SSE/Server-Sent-Events/EventSource for java spark sparkjava.com
package com.spark;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.*;
import org.eclipse.jetty.servlets.EventSource;
import org.eclipse.jetty.servlets.EventSourceServlet;
import spark.Service;
import spark.Spark;
import spark.embeddedserver.jetty.EmbeddedJettyServer;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* <p>SparkEventSource is a complementary package that enables Jetty EventSource
* with <a href="http//javaspark.com">spark</a>.</p>
*
* <p><em>IMPORTANT:</em> Tested on spark-java 2.5. You need the
* org.eclipse.jetty:jetty-servlets:9.3.6.v20151106 added to your pom.xml
* in order to use this class!</p>
*
* <p>Example Usage:</p>
*
* <pre>
* Set&lt;EventSource.Emitter&gt; emitters = Sets.newConcurrentHashSet();
* SparkEventSource.eventSource(
* "/event", (HttpServletRequest request) -&gt; new EventSource() {
* Emitter emitter;
* &#64;Override
* public void onOpen(Emitter emitter) throws IOException {
* this.emitter = emitter;
* emitters.add(emitter);
* }
* &#64;Override
* public void onClose() {
* emitters.remove(emitter);
* emitter.close();
* }
* });
* ExecutorService service = Executors.newSingleThreadExecutor();
* service.submit(() -&gt; {
* while (true) { // in case server lost connection - keep reconnecting
* try {
* Iterable&lt;Event&gt; it = eventIter;
* Gson gson = new Gson();
* for (Event event : it) {
* for (EventSource.Emitter emitter : emitters) {
* try {
* emitter.data(gson.toJson(event));
* } catch (IOException e) {
* e.printStackTrace();
* }
* }
* }
* } catch (Exception e) {
* System.err.println("Connection lost with server, retrying");
* e.printStackTrace();
* }
* }
* });
* </pre>
*
* <p><em>Note:</em> This is not a stable class, in future sparkjava version use
* the native SSE capability.</p>
*
* <p>I access private, implemntation defined fields which might change!</p>
*
* <p>This is just a hack for the time being, while new spark java is
* released.</p>
*/
public class SparkEventSource {
public interface Handler {
EventSource newEventSource(HttpServletRequest request);
}
private SparkEventSource() {
Service service = getSparkService();
server = getSparkJettyServer(service);
}
public static void eventSource(String url, Handler eventSourceServlet) {
ServletHandler context = new ServletHandler();
context.setEnsureDefaultServlet(
false); // if req don't match - keep req unhandled
context.addServletWithMapping(
new ServletHolder(new EventSourceServlet() {
@Override
protected EventSource newEventSource(
HttpServletRequest request) {
return eventSourceServlet.newEventSource(request);
}
}),
url);
addFirstHandler(context);
}
private static void
addFirstHandler(org.eclipse.jetty.server.Handler context) {
try {
Server server = getInstance().server;
while (server.getHandler() == null)
sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
HandlerList handlerList = new HandlerList();
handlerList.addHandler(server.getHandler());
if (server.getHandler() instanceof HandlerList)
handlerList = (HandlerList)server.getHandler();
else
server.setHandler(handlerList);
org.eclipse.jetty.server.Handler[] handlers =
handlerList.getHandlers();
handlers = Arrays.copyOf(handlers, handlers.length + 1);
System.arraycopy(handlers, 0, handlers, 1, handlers.length - 1);
handlers[0] = context;
handlerList.setHandlers(handlers);
} catch (Exception e) {
e.printStackTrace();
}
}
private static Service getSparkService() {
Class<Spark> sparkClass = Spark.class;
Method getDefaultInstance = null;
try {
getDefaultInstance = sparkClass.getDeclaredMethod("getInstance");
getDefaultInstance.setAccessible(true);
return (Service)getDefaultInstance.invoke(sparkClass);
} catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private static Server getSparkJettyServer(Service service) {
try {
EmbeddedJettyServer embeddedJettyServer = null;
service.init();
while (embeddedJettyServer == null) {
Field field = service.getClass().getDeclaredField("server");
field.setAccessible(true);
embeddedJettyServer = (EmbeddedJettyServer)field.get(service);
sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
}
Field serverField =
embeddedJettyServer.getClass().getDeclaredField("server");
serverField.setAccessible(true);
while (serverField.get(embeddedJettyServer) == null)
sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
return (Server)serverField.get(embeddedJettyServer);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private Server server;
private static class SingletonHolder {
private static final SparkEventSource INSTANCE = new SparkEventSource();
}
private static SparkEventSource getInstance() {
return SingletonHolder.INSTANCE;
}
// from guava
/**
* Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)}
* uninterruptibly.
*/
private static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
boolean interrupted = false;
try {
long remainingNanos = unit.toNanos(sleepFor);
long end = System.nanoTime() + remainingNanos;
while (true) {
try {
// TimeUnit.sleep() treats negative timeouts just like zero.
NANOSECONDS.sleep(remainingNanos);
return;
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = end - System.nanoTime();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment