Created
August 14, 2015 06:34
-
-
Save pfmiles/e273ce621b300e61efc4 to your computer and use it in GitHub Desktop.
SSE NIO异步推送,使用jetty continuation + kilimFiber, 服务端java代码框架
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentHashMap; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* 异步数据源,负责产生异步数据到达事件、调度注册在其上的AsyncSseRespEmitter; | |
* | |
* @author pf-miles Aug 4, 2015 2:28:22 PM | |
*/ | |
public abstract class AsyncEventSource { | |
private static final Logger log = LoggerFactory.getLogger(AsyncEventSource.class); | |
// 所有注册上来的emitter | |
private Set<AsyncSseRespEmitter> emitters = Collections.newSetFromMap(new ConcurrentHashMap<AsyncSseRespEmitter, Boolean>( | |
16, | |
0.75f, | |
Constants.PROC_NUM * 10)); | |
/** | |
* 注册emitter到本事件源 | |
* | |
* @param asyncSseRespEmitter | |
*/ | |
void register(AsyncSseRespEmitter asyncSseRespEmitter) { | |
this.emitters.add(asyncSseRespEmitter); | |
} | |
/** | |
* 从本事件源注销emitter | |
* | |
* @param asyncSseRespEmitter | |
*/ | |
void unregister(AsyncSseRespEmitter asyncSseRespEmitter) { | |
this.emitters.remove(asyncSseRespEmitter); | |
} | |
/** | |
* 唤醒所有注册的emitter | |
*/ | |
protected final void notifyAllEmitters() { | |
for (AsyncSseRespEmitter e : this.emitters) | |
try { | |
e.resume(); | |
} catch (Exception ex) { | |
try { | |
e.close(); | |
} catch (Exception exx) { | |
} | |
log.info("Emitter resume failed, ignored and unregistered... Cause: " + ex.getMessage()); | |
} | |
} | |
/** | |
* check if new events arrived after the specified eventId | |
*/ | |
protected abstract boolean hasNewEventSince(String eventId); | |
/** | |
* get new events arrived after the specified eventId | |
*/ | |
protected abstract List<EventMsg> getEventMsgsSince(String eventId); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.util.List; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import kilim.Pausable; | |
import kilim.Task; | |
import org.eclipse.jetty.continuation.Continuation; | |
import org.eclipse.jetty.continuation.ContinuationSupport; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* 使HttpServletResponse产生sse(server sent event)消息流的web层工具类,与AsyncEventSource配合使用,注册到event source上 | |
* | |
* @author pf-miles Aug 4, 2015 2:17:11 PM | |
*/ | |
public class AsyncSseRespEmitter extends Task implements AutoCloseable { | |
private static final Logger log = LoggerFactory.getLogger(AsyncSseRespEmitter.class); | |
private static final String ENC = "UTF-8"; | |
private AsyncEventSource eventSource; | |
private HttpServletRequest request; | |
private HttpServletResponse response; | |
private OutputStream out; | |
// 记录迄今为止返回过的event的最后一个event的id | |
private String lastId; | |
// 客户端断点重连时间间隔 | |
private long retryInterval = 3000; | |
// 是否首次输出 | |
private boolean firstOutput = true; | |
// 防止在continuation suspend之前就被resume触发bug | |
private Lock suspendLock = new ReentrantLock(); | |
/** | |
* 针对指定的数据源创建一个SSE事件产生器,持续向response输出sse格式的数据 | |
* | |
* @param eventSource | |
* @param request | |
* @param response | |
* @param lastId 最后一个eventId,表明客户端只希望接受这个id之后的数据 | |
*/ | |
public AsyncSseRespEmitter(AsyncEventSource eventSource, HttpServletRequest request, HttpServletResponse response, | |
String lastId){ | |
this.eventSource = eventSource; | |
this.request = request; | |
this.response = response; | |
this.lastId = lastId; | |
try { | |
this.out = response.getOutputStream(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
// auto-register itself after constructed | |
eventSource.register(this); | |
} | |
/** | |
* 不断向response推送数据,直到客户端断开链接为止; 期间没数据时会yield以释放线程资源 | |
*/ | |
public void execute() throws Pausable, Exception { | |
try { | |
while (true) { | |
if (this.eventSource.hasNewEventSince(this.lastId)) { | |
response.setHeader("Access-Control-Allow-Origin", "*"); | |
response.setHeader("Content-Type", "text/event-stream"); | |
response.setStatus(HttpServletResponse.SC_OK); | |
// 有新事件到达,输出response | |
List<EventMsg> msgs = this.eventSource.getEventMsgsSince(this.lastId); | |
for (EventMsg msg : msgs) { | |
if (this.firstOutput) { | |
// 首次输出可控制retryInterval | |
msg.setRetryInterval(retryInterval); | |
this.firstOutput = false; | |
} | |
this.out.write(msg.toString().getBytes(ENC)); | |
} | |
this.out.flush(); | |
} | |
// yield释放线程 | |
this.suspendLock.lock(); | |
try { | |
Continuation cont = ContinuationSupport.getContinuation(request); | |
if (!cont.isSuspended()) { | |
cont.setTimeout(-1); | |
cont.suspend(); | |
} | |
} finally { | |
this.suspendLock.unlock(); | |
Task.yield(); | |
} | |
} | |
} catch (IOException e) { | |
// end the sse session when io exception occured(usually the client closed the stream). | |
this.close(); | |
log.info("IO exception occured when pusing msgs, emitter closed & unregistered... Cause: " + e.getMessage()); | |
} | |
} | |
@Override | |
public void close() throws Exception { | |
try { | |
out.close(); | |
this.response.flushBuffer(); | |
} catch (Exception e) { | |
// ignores... | |
} finally { | |
this.eventSource.unregister(this); | |
} | |
} | |
/** | |
* 设置client断线重连的重试间隔 | |
* | |
* @param interval 单位为毫秒 | |
*/ | |
public void setClientRetryInterval(long interval) { | |
this.retryInterval = interval; | |
} | |
/** | |
* 通知暂停的emitter继续执行 | |
*/ | |
void resume() { | |
this.suspendLock.lock(); | |
try { | |
Continuation cont = ContinuationSupport.getContinuation(request); | |
if (!cont.isResumed()) cont.resume(); | |
} finally { | |
this.suspendLock.unlock(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test; | |
import java.io.Serializable; | |
import org.apache.commons.lang.StringUtils; | |
/** | |
* 待SSE输出的事件 | |
* | |
* @author pf-miles Aug 5, 2015 10:19:20 AM | |
*/ | |
public class EventMsg implements Serializable { | |
private static final long serialVersionUID = 8595232243839083435L; | |
private static final char NL = '\n'; | |
private String id; | |
// 控制客户端断点重连的时间间隔,为null时不输出 | |
private Long retryInterval; | |
private StringBuilder msg; | |
public String getId() { | |
return id; | |
} | |
public void setId(String id) { | |
this.id = id; | |
} | |
public Long getRetryInterval() { | |
return retryInterval; | |
} | |
public void setRetryInterval(Long retryInterval) { | |
this.retryInterval = retryInterval; | |
} | |
public String getMsg() { | |
return msg.toString(); | |
} | |
public EventMsg appendMsg(Object msg) { | |
if (this.msg == null) this.msg = new StringBuilder(); | |
this.msg.append(msg); | |
return this; | |
} | |
/** | |
* 转换为SSE格式的String消息 | |
*/ | |
public String toString() { | |
StringBuilder sb = new StringBuilder(); | |
sb.append("id:").append(this.id).append(NL); | |
if (this.retryInterval != null) sb.append("retry:").append(this.retryInterval).append(NL); | |
if (this.msg != null && this.msg.length() > 0) { | |
String s = this.msg.toString(); | |
for (String line : s.split("\n")) | |
if (StringUtils.isNotEmpty(line)) sb.append("data:").append(line).append(NL); | |
} | |
sb.append(NL); | |
return sb.toString(); | |
} | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
result = prime * result + ((id == null) ? 0 : id.hashCode()); | |
return result; | |
} | |
public boolean equals(Object obj) { | |
if (this == obj) return true; | |
if (obj == null) return false; | |
if (getClass() != obj.getClass()) return false; | |
EventMsg other = (EventMsg) obj; | |
if (id == null) { | |
if (other.id != null) return false; | |
} else if (!id.equals(other.id)) return false; | |
return true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test; | |
import java.io.File; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Map.Entry; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.commons.io.LineIterator; | |
import org.mapdb.BTreeMap; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.DisposableBean; | |
import org.springframework.beans.factory.InitializingBean; | |
import com.github.pfmiles.org.apache.commons.lang.StringUtils; | |
/** | |
* 数据源实现示 | |
* | |
* @author pf-miles Aug 5, 2015 11:20:55 AM | |
*/ | |
public class FuncLogEventSource extends AsyncEventSource implements InitializingBean, DisposableBean { | |
private static final Logger log = LoggerFactory.getLogger(FuncLogEventSource.class); | |
private static final char NL = '\n'; | |
// 函数编译日志文件位置 | |
private String funcLogFilePath; | |
// 执行开关 | |
private volatile boolean enabled = true; | |
// 内存缓存,存放当前log数据 | |
private BTreeMap<Date, EventMsg> data = OffHeapMemCacheHelper.createOrderedOffHeapMemCache(10 / 1024); // 10MB | |
// 日志时间的格式,2015-07-24 00:47:58 | |
private static final String entryDatePattern = "yyyy-MM-dd HH:mm:ss"; | |
// 当eventId不存在时,默认返回的事件数目 | |
private static final int default_last_count = 5; | |
@Override | |
protected boolean hasNewEventSince(String lastId) { | |
SimpleDateFormat fmt = new SimpleDateFormat(entryDatePattern); | |
if (isNotExistKey(lastId, fmt)) { | |
return !data.isEmpty(); | |
} else { | |
try { | |
return data.higherKey(fmt.parse(lastId)) != null; | |
} catch (ParseException e) { | |
throw new RuntimeException("impossible!", e); | |
} | |
} | |
} | |
private boolean isNotExistKey(String keyStr, SimpleDateFormat fmt) { | |
if (keyStr == null) return true; | |
Date k = null; | |
try { | |
k = fmt.parse(keyStr); | |
} catch (ParseException e) { | |
return true; | |
} | |
return !data.containsKey(k); | |
} | |
@Override | |
protected List<EventMsg> getEventMsgsSince(String lastId) { | |
if (!hasNewEventSince(lastId)) return null; | |
SimpleDateFormat fmt = new SimpleDateFormat(entryDatePattern); | |
if (isNotExistKey(lastId, fmt)) { | |
// key not exist, default to return last three events | |
LinkedList<EventMsg> ret = new LinkedList<>(); | |
Iterator<Entry<Date, EventMsg>> iter = this.data.descendingMap().entrySet().iterator(); | |
for (int i = 0; i < default_last_count && iter.hasNext(); i++) | |
ret.push(iter.next().getValue()); | |
return ret; | |
} else { | |
// return events after the last key | |
List<EventMsg> ret = new ArrayList<EventMsg>(); | |
Date k; | |
try { | |
k = fmt.parse(lastId); | |
} catch (ParseException e) { | |
throw new RuntimeException("impossible!", e); | |
} | |
for (Entry<Date, EventMsg> e = this.data.higherEntry(k); e != null; e = this.data.higherEntry(k)) { | |
ret.add(e.getValue()); | |
k = e.getKey(); | |
} | |
return ret; | |
} | |
} | |
public void setFuncLogFilePath(String funcLogFilePath) { | |
this.funcLogFilePath = funcLogFilePath; | |
} | |
public void destroy() throws Exception { | |
this.enabled = false; | |
} | |
@Override | |
public void afterPropertiesSet() throws Exception { | |
// 启动日志检查线程 | |
new Thread(new Runnable() { | |
@Override | |
public void run() { | |
SimpleDateFormat fmt = new SimpleDateFormat(entryDatePattern); | |
while (enabled) { | |
try { | |
// load current funcLog into (off-heap) memory, indexed by log entry time | |
File logFile = new File(funcLogFilePath); | |
if (!logFile.exists()) throw new RuntimeException("Function log file: " + funcLogFilePath | |
+ " does not exist."); | |
LineIterator iter = FileUtils.lineIterator(logFile, "UTF-8"); | |
// the current constructing msg, null means no constructing at the moment | |
EventMsg curMsg = null; | |
// construcing possibly new msg from log file | |
while (iter.hasNext()) { | |
String l = iter.next(); | |
if (isLogEntryBegin(l, fmt)) { | |
// entry begin encountered, check if new entry to save the old constructing msg & start | |
// a new one... | |
String tsString = StringUtils.substringBetween(l, "[", "]"); | |
Date entryDate = fmt.parse(tsString); | |
if (!data.containsKey(entryDate)) { | |
// save the old constructing msg and start a new one | |
if (curMsg != null) { | |
data.put(fmt.parse(curMsg.getId()), curMsg); | |
} | |
curMsg = new EventMsg(); | |
curMsg.setId(tsString); | |
curMsg.appendMsg(l).appendMsg(NL);// first line should also be appended | |
} | |
} else { | |
// not entry beginning, append msg if constructing | |
if (curMsg != null) { | |
curMsg.appendMsg(l).appendMsg(NL); | |
} | |
} | |
} | |
if (curMsg != null) { | |
// save the constructing msg at each end of loop | |
data.put(fmt.parse(curMsg.getId()), curMsg); | |
// then notify all emitters | |
FuncLogEventSource.this.notifyAllEmitters(); | |
} | |
Thread.sleep(5000); | |
} catch (Throwable t) { | |
// log and continue executing | |
log.error("Error in 'func-log-check-thread'.", t); | |
} | |
} | |
} | |
}, "func-log-check-thread").start(); | |
} | |
// to determine if the specified line is a beginning of a log entry | |
private boolean isLogEntryBegin(String line, SimpleDateFormat entryTsFormat) { | |
if (StringUtils.isBlank(line)) return false; | |
if (!line.startsWith("[")) return false; | |
String tsStr = StringUtils.substringBetween(line, "[", "]"); | |
if (StringUtils.isBlank(tsStr)) return false; | |
try { | |
entryTsFormat.parse(tsStr); | |
return true; | |
} catch (ParseException e) { | |
return false; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
注:webx3中,由于servletRequest/servletResponse是经过webx包装过的对象,其将本应写给客户端的数据通通缓存在了内存里;这与SSE的long polling机制冲突;因此在webx3下,应使用
将context注入,并通过context来取得原本的req/resp对象,而非直接通过注入的方式拿到被代理的req/resp对象