Skip to content

Instantly share code, notes, and snippets.

@pfmiles
Created August 14, 2015 06:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pfmiles/e273ce621b300e61efc4 to your computer and use it in GitHub Desktop.
Save pfmiles/e273ce621b300e61efc4 to your computer and use it in GitHub Desktop.
SSE NIO异步推送,使用jetty continuation + kilimFiber, 服务端java代码框架
package test;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 异步数据源,负责产生异步数据到达事件、调度注册在其上的AsyncSseRespEmitter;
*
* @author pf-miles Aug 4, 2015 2:28:22 PM
*/
public abstract class AsyncEventSource {
private static final Logger log = LoggerFactory.getLogger(AsyncEventSource.class);
// 所有注册上来的emitter
private Set<AsyncSseRespEmitter> emitters = Collections.newSetFromMap(new ConcurrentHashMap<AsyncSseRespEmitter, Boolean>(
16,
0.75f,
Constants.PROC_NUM * 10));
/**
* 注册emitter到本事件源
*
* @param asyncSseRespEmitter
*/
void register(AsyncSseRespEmitter asyncSseRespEmitter) {
this.emitters.add(asyncSseRespEmitter);
}
/**
* 从本事件源注销emitter
*
* @param asyncSseRespEmitter
*/
void unregister(AsyncSseRespEmitter asyncSseRespEmitter) {
this.emitters.remove(asyncSseRespEmitter);
}
/**
* 唤醒所有注册的emitter
*/
protected final void notifyAllEmitters() {
for (AsyncSseRespEmitter e : this.emitters)
try {
e.resume();
} catch (Exception ex) {
try {
e.close();
} catch (Exception exx) {
}
log.info("Emitter resume failed, ignored and unregistered... Cause: " + ex.getMessage());
}
}
/**
* check if new events arrived after the specified eventId
*/
protected abstract boolean hasNewEventSince(String eventId);
/**
* get new events arrived after the specified eventId
*/
protected abstract List<EventMsg> getEventMsgsSince(String eventId);
}
package test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import kilim.Pausable;
import kilim.Task;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 使HttpServletResponse产生sse(server sent event)消息流的web层工具类,与AsyncEventSource配合使用,注册到event source上
*
* @author pf-miles Aug 4, 2015 2:17:11 PM
*/
public class AsyncSseRespEmitter extends Task implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(AsyncSseRespEmitter.class);
private static final String ENC = "UTF-8";
private AsyncEventSource eventSource;
private HttpServletRequest request;
private HttpServletResponse response;
private OutputStream out;
// 记录迄今为止返回过的event的最后一个event的id
private String lastId;
// 客户端断点重连时间间隔
private long retryInterval = 3000;
// 是否首次输出
private boolean firstOutput = true;
// 防止在continuation suspend之前就被resume触发bug
private Lock suspendLock = new ReentrantLock();
/**
* 针对指定的数据源创建一个SSE事件产生器,持续向response输出sse格式的数据
*
* @param eventSource
* @param request
* @param response
* @param lastId 最后一个eventId,表明客户端只希望接受这个id之后的数据
*/
public AsyncSseRespEmitter(AsyncEventSource eventSource, HttpServletRequest request, HttpServletResponse response,
String lastId){
this.eventSource = eventSource;
this.request = request;
this.response = response;
this.lastId = lastId;
try {
this.out = response.getOutputStream();
} catch (Exception e) {
throw new RuntimeException(e);
}
// auto-register itself after constructed
eventSource.register(this);
}
/**
* 不断向response推送数据,直到客户端断开链接为止; 期间没数据时会yield以释放线程资源
*/
public void execute() throws Pausable, Exception {
try {
while (true) {
if (this.eventSource.hasNewEventSince(this.lastId)) {
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Content-Type", "text/event-stream");
response.setStatus(HttpServletResponse.SC_OK);
// 有新事件到达,输出response
List<EventMsg> msgs = this.eventSource.getEventMsgsSince(this.lastId);
for (EventMsg msg : msgs) {
if (this.firstOutput) {
// 首次输出可控制retryInterval
msg.setRetryInterval(retryInterval);
this.firstOutput = false;
}
this.out.write(msg.toString().getBytes(ENC));
}
this.out.flush();
}
// yield释放线程
this.suspendLock.lock();
try {
Continuation cont = ContinuationSupport.getContinuation(request);
if (!cont.isSuspended()) {
cont.setTimeout(-1);
cont.suspend();
}
} finally {
this.suspendLock.unlock();
Task.yield();
}
}
} catch (IOException e) {
// end the sse session when io exception occured(usually the client closed the stream).
this.close();
log.info("IO exception occured when pusing msgs, emitter closed & unregistered... Cause: " + e.getMessage());
}
}
@Override
public void close() throws Exception {
try {
out.close();
this.response.flushBuffer();
} catch (Exception e) {
// ignores...
} finally {
this.eventSource.unregister(this);
}
}
/**
* 设置client断线重连的重试间隔
*
* @param interval 单位为毫秒
*/
public void setClientRetryInterval(long interval) {
this.retryInterval = interval;
}
/**
* 通知暂停的emitter继续执行
*/
void resume() {
this.suspendLock.lock();
try {
Continuation cont = ContinuationSupport.getContinuation(request);
if (!cont.isResumed()) cont.resume();
} finally {
this.suspendLock.unlock();
}
}
}
package test;
import java.io.Serializable;
import org.apache.commons.lang.StringUtils;
/**
* 待SSE输出的事件
*
* @author pf-miles Aug 5, 2015 10:19:20 AM
*/
public class EventMsg implements Serializable {
private static final long serialVersionUID = 8595232243839083435L;
private static final char NL = '\n';
private String id;
// 控制客户端断点重连的时间间隔,为null时不输出
private Long retryInterval;
private StringBuilder msg;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(Long retryInterval) {
this.retryInterval = retryInterval;
}
public String getMsg() {
return msg.toString();
}
public EventMsg appendMsg(Object msg) {
if (this.msg == null) this.msg = new StringBuilder();
this.msg.append(msg);
return this;
}
/**
* 转换为SSE格式的String消息
*/
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("id:").append(this.id).append(NL);
if (this.retryInterval != null) sb.append("retry:").append(this.retryInterval).append(NL);
if (this.msg != null && this.msg.length() > 0) {
String s = this.msg.toString();
for (String line : s.split("\n"))
if (StringUtils.isNotEmpty(line)) sb.append("data:").append(line).append(NL);
}
sb.append(NL);
return sb.toString();
}
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
EventMsg other = (EventMsg) obj;
if (id == null) {
if (other.id != null) return false;
} else if (!id.equals(other.id)) return false;
return true;
}
}
package test;
import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.mapdb.BTreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import com.github.pfmiles.org.apache.commons.lang.StringUtils;
/**
* 数据源实现示
*
* @author pf-miles Aug 5, 2015 11:20:55 AM
*/
public class FuncLogEventSource extends AsyncEventSource implements InitializingBean, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(FuncLogEventSource.class);
private static final char NL = '\n';
// 函数编译日志文件位置
private String funcLogFilePath;
// 执行开关
private volatile boolean enabled = true;
// 内存缓存,存放当前log数据
private BTreeMap<Date, EventMsg> data = OffHeapMemCacheHelper.createOrderedOffHeapMemCache(10 / 1024); // 10MB
// 日志时间的格式,2015-07-24 00:47:58
private static final String entryDatePattern = "yyyy-MM-dd HH:mm:ss";
// 当eventId不存在时,默认返回的事件数目
private static final int default_last_count = 5;
@Override
protected boolean hasNewEventSince(String lastId) {
SimpleDateFormat fmt = new SimpleDateFormat(entryDatePattern);
if (isNotExistKey(lastId, fmt)) {
return !data.isEmpty();
} else {
try {
return data.higherKey(fmt.parse(lastId)) != null;
} catch (ParseException e) {
throw new RuntimeException("impossible!", e);
}
}
}
private boolean isNotExistKey(String keyStr, SimpleDateFormat fmt) {
if (keyStr == null) return true;
Date k = null;
try {
k = fmt.parse(keyStr);
} catch (ParseException e) {
return true;
}
return !data.containsKey(k);
}
@Override
protected List<EventMsg> getEventMsgsSince(String lastId) {
if (!hasNewEventSince(lastId)) return null;
SimpleDateFormat fmt = new SimpleDateFormat(entryDatePattern);
if (isNotExistKey(lastId, fmt)) {
// key not exist, default to return last three events
LinkedList<EventMsg> ret = new LinkedList<>();
Iterator<Entry<Date, EventMsg>> iter = this.data.descendingMap().entrySet().iterator();
for (int i = 0; i < default_last_count && iter.hasNext(); i++)
ret.push(iter.next().getValue());
return ret;
} else {
// return events after the last key
List<EventMsg> ret = new ArrayList<EventMsg>();
Date k;
try {
k = fmt.parse(lastId);
} catch (ParseException e) {
throw new RuntimeException("impossible!", e);
}
for (Entry<Date, EventMsg> e = this.data.higherEntry(k); e != null; e = this.data.higherEntry(k)) {
ret.add(e.getValue());
k = e.getKey();
}
return ret;
}
}
public void setFuncLogFilePath(String funcLogFilePath) {
this.funcLogFilePath = funcLogFilePath;
}
public void destroy() throws Exception {
this.enabled = false;
}
@Override
public void afterPropertiesSet() throws Exception {
// 启动日志检查线程
new Thread(new Runnable() {
@Override
public void run() {
SimpleDateFormat fmt = new SimpleDateFormat(entryDatePattern);
while (enabled) {
try {
// load current funcLog into (off-heap) memory, indexed by log entry time
File logFile = new File(funcLogFilePath);
if (!logFile.exists()) throw new RuntimeException("Function log file: " + funcLogFilePath
+ " does not exist.");
LineIterator iter = FileUtils.lineIterator(logFile, "UTF-8");
// the current constructing msg, null means no constructing at the moment
EventMsg curMsg = null;
// construcing possibly new msg from log file
while (iter.hasNext()) {
String l = iter.next();
if (isLogEntryBegin(l, fmt)) {
// entry begin encountered, check if new entry to save the old constructing msg & start
// a new one...
String tsString = StringUtils.substringBetween(l, "[", "]");
Date entryDate = fmt.parse(tsString);
if (!data.containsKey(entryDate)) {
// save the old constructing msg and start a new one
if (curMsg != null) {
data.put(fmt.parse(curMsg.getId()), curMsg);
}
curMsg = new EventMsg();
curMsg.setId(tsString);
curMsg.appendMsg(l).appendMsg(NL);// first line should also be appended
}
} else {
// not entry beginning, append msg if constructing
if (curMsg != null) {
curMsg.appendMsg(l).appendMsg(NL);
}
}
}
if (curMsg != null) {
// save the constructing msg at each end of loop
data.put(fmt.parse(curMsg.getId()), curMsg);
// then notify all emitters
FuncLogEventSource.this.notifyAllEmitters();
}
Thread.sleep(5000);
} catch (Throwable t) {
// log and continue executing
log.error("Error in 'func-log-check-thread'.", t);
}
}
}
}, "func-log-check-thread").start();
}
// to determine if the specified line is a beginning of a log entry
private boolean isLogEntryBegin(String line, SimpleDateFormat entryTsFormat) {
if (StringUtils.isBlank(line)) return false;
if (!line.startsWith("[")) return false;
String tsStr = StringUtils.substringBetween(line, "[", "]");
if (StringUtils.isBlank(tsStr)) return false;
try {
entryTsFormat.parse(tsStr);
return true;
} catch (ParseException e) {
return false;
}
}
}
@pfmiles
Copy link
Author

pfmiles commented Aug 14, 2015

注:webx3中,由于servletRequest/servletResponse是经过webx包装过的对象,其将本应写给客户端的数据通通缓存在了内存里;这与SSE的long polling机制冲突;因此在webx3下,应使用

@Autowired
private BufferedRequestContext reqContext;
...
this.reqContext.setBuffering(false)

将context注入,并通过context来取得原本的req/resp对象,而非直接通过注入的方式拿到被代理的req/resp对象

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment