Last active
June 17, 2021 08:12
-
-
Save elazarl/f69736b815c86a1f828948d995411da1 to your computer and use it in GitHub Desktop.
Suuport SSE/Server-Sent-Events/EventSource for java spark sparkjava.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spark; | |
import org.eclipse.jetty.server.Server; | |
import org.eclipse.jetty.server.handler.HandlerList; | |
import org.eclipse.jetty.servlet.*; | |
import org.eclipse.jetty.servlets.EventSource; | |
import org.eclipse.jetty.servlets.EventSourceServlet; | |
import spark.Service; | |
import spark.Spark; | |
import spark.embeddedserver.jetty.EmbeddedJettyServer; | |
import javax.servlet.http.HttpServletRequest; | |
import java.lang.reflect.Field; | |
import java.lang.reflect.InvocationTargetException; | |
import java.lang.reflect.Method; | |
import java.util.Arrays; | |
import java.util.concurrent.TimeUnit; | |
import static java.util.concurrent.TimeUnit.NANOSECONDS; | |
/** | |
* <p>SparkEventSource is a complementary package that enables Jetty EventSource | |
* with <a href="http//javaspark.com">spark</a>.</p> | |
* | |
* <p><em>IMPORTANT:</em> Tested on spark-java 2.5. You need the | |
* org.eclipse.jetty:jetty-servlets:9.3.6.v20151106 added to your pom.xml | |
* in order to use this class!</p> | |
* | |
* <p>Example Usage:</p> | |
* | |
* <pre> | |
* Set<EventSource.Emitter> emitters = Sets.newConcurrentHashSet(); | |
* SparkEventSource.eventSource( | |
* "/event", (HttpServletRequest request) -> new EventSource() { | |
* Emitter emitter; | |
* @Override | |
* public void onOpen(Emitter emitter) throws IOException { | |
* this.emitter = emitter; | |
* emitters.add(emitter); | |
* } | |
* @Override | |
* public void onClose() { | |
* emitters.remove(emitter); | |
* emitter.close(); | |
* } | |
* }); | |
* ExecutorService service = Executors.newSingleThreadExecutor(); | |
* service.submit(() -> { | |
* while (true) { // in case server lost connection - keep reconnecting | |
* try { | |
* Iterable<Event> it = eventIter; | |
* Gson gson = new Gson(); | |
* for (Event event : it) { | |
* for (EventSource.Emitter emitter : emitters) { | |
* try { | |
* emitter.data(gson.toJson(event)); | |
* } catch (IOException e) { | |
* e.printStackTrace(); | |
* } | |
* } | |
* } | |
* } catch (Exception e) { | |
* System.err.println("Connection lost with server, retrying"); | |
* e.printStackTrace(); | |
* } | |
* } | |
* }); | |
* </pre> | |
* | |
* <p><em>Note:</em> This is not a stable class, in future sparkjava version use | |
* the native SSE capability.</p> | |
* | |
* <p>I access private, implemntation defined fields which might change!</p> | |
* | |
* <p>This is just a hack for the time being, while new spark java is | |
* released.</p> | |
*/ | |
public class SparkEventSource { | |
public interface Handler { | |
EventSource newEventSource(HttpServletRequest request); | |
} | |
private SparkEventSource() { | |
Service service = getSparkService(); | |
server = getSparkJettyServer(service); | |
} | |
public static void eventSource(String url, Handler eventSourceServlet) { | |
ServletHandler context = new ServletHandler(); | |
context.setEnsureDefaultServlet( | |
false); // if req don't match - keep req unhandled | |
context.addServletWithMapping( | |
new ServletHolder(new EventSourceServlet() { | |
@Override | |
protected EventSource newEventSource( | |
HttpServletRequest request) { | |
return eventSourceServlet.newEventSource(request); | |
} | |
}), | |
url); | |
addFirstHandler(context); | |
} | |
private static void | |
addFirstHandler(org.eclipse.jetty.server.Handler context) { | |
try { | |
Server server = getInstance().server; | |
while (server.getHandler() == null) | |
sleepUninterruptibly(10, TimeUnit.MILLISECONDS); | |
HandlerList handlerList = new HandlerList(); | |
handlerList.addHandler(server.getHandler()); | |
if (server.getHandler() instanceof HandlerList) | |
handlerList = (HandlerList)server.getHandler(); | |
else | |
server.setHandler(handlerList); | |
org.eclipse.jetty.server.Handler[] handlers = | |
handlerList.getHandlers(); | |
handlers = Arrays.copyOf(handlers, handlers.length + 1); | |
System.arraycopy(handlers, 0, handlers, 1, handlers.length - 1); | |
handlers[0] = context; | |
handlerList.setHandlers(handlers); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
private static Service getSparkService() { | |
Class<Spark> sparkClass = Spark.class; | |
Method getDefaultInstance = null; | |
try { | |
getDefaultInstance = sparkClass.getDeclaredMethod("getInstance"); | |
getDefaultInstance.setAccessible(true); | |
return (Service)getDefaultInstance.invoke(sparkClass); | |
} catch (NoSuchMethodException | InvocationTargetException | | |
IllegalAccessException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private static Server getSparkJettyServer(Service service) { | |
try { | |
EmbeddedJettyServer embeddedJettyServer = null; | |
service.init(); | |
while (embeddedJettyServer == null) { | |
Field field = service.getClass().getDeclaredField("server"); | |
field.setAccessible(true); | |
embeddedJettyServer = (EmbeddedJettyServer)field.get(service); | |
sleepUninterruptibly(10, TimeUnit.MILLISECONDS); | |
} | |
Field serverField = | |
embeddedJettyServer.getClass().getDeclaredField("server"); | |
serverField.setAccessible(true); | |
while (serverField.get(embeddedJettyServer) == null) | |
sleepUninterruptibly(10, TimeUnit.MILLISECONDS); | |
return (Server)serverField.get(embeddedJettyServer); | |
} catch (NoSuchFieldException | IllegalAccessException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private Server server; | |
private static class SingletonHolder { | |
private static final SparkEventSource INSTANCE = new SparkEventSource(); | |
} | |
private static SparkEventSource getInstance() { | |
return SingletonHolder.INSTANCE; | |
} | |
// from guava | |
/** | |
* Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)} | |
* uninterruptibly. | |
*/ | |
private static void sleepUninterruptibly(long sleepFor, TimeUnit unit) { | |
boolean interrupted = false; | |
try { | |
long remainingNanos = unit.toNanos(sleepFor); | |
long end = System.nanoTime() + remainingNanos; | |
while (true) { | |
try { | |
// TimeUnit.sleep() treats negative timeouts just like zero. | |
NANOSECONDS.sleep(remainingNanos); | |
return; | |
} catch (InterruptedException e) { | |
interrupted = true; | |
remainingNanos = end - System.nanoTime(); | |
} | |
} | |
} finally { | |
if (interrupted) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment