Created
April 16, 2010 05:27
-
-
Save noonat/368053 to your computer and use it in GitHub Desktop.
Rhino event loop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function EventEmitter() { | |
this._listeners = {}; | |
} | |
exports.EventEmitter = EventEmitter; | |
EventEmitter.prototype.listeners = function(event) { | |
if (!this._listeners[event]) { | |
this._listeners[event] = []; | |
} | |
return this._listeners[event]; | |
}; | |
EventEmitter.prototype.addListener = function(event, listener) { | |
if (!this._listeners[event]) { | |
this._listeners[event] = []; | |
} | |
this._listeners[event].push(listener); | |
return listener; | |
}; | |
EventEmitter.prototype.removeListener = function(event, listener) { | |
var listeners = this._listeners[event]; | |
if (listeners) { | |
var index = listeners.indexOf(listener); | |
if (index !== -1) { | |
listeners.splice(index, 1); | |
} | |
} | |
}; | |
EventEmitter.prototype.removeAllListeners = function(event) { | |
if (this._listeners[event]) { | |
delete this._listeners[event]; | |
} | |
}; | |
EventEmitter.prototype.emit = function(event) { | |
var listeners = this._listeners[event]; | |
if (listeners && listeners.length > 0) { | |
var args = Array.prototype.slice.call(arguments, 1); | |
for (var i = 0, len = listeners.length; i < len; ++i) { | |
listeners[i].apply(null, args); | |
} | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var EventEmitter = require('events').EventEmitter; | |
function Stream(reactor, channel) { | |
EventEmitter.call(this); | |
this.channel = channel; | |
this.channel.configureBlocking(false); | |
this.reactor = reactor; | |
this.readable = false; | |
this.readBuffer = java.nio.ByteBuffer.allocate(1024); | |
this.writable = false; | |
this.writeBuffer = java.nio.ByteBuffer.allocate(1024); | |
this.flushing = false; | |
this.flushQueue = []; | |
var self = this; | |
this.channelListener = this.reactor.channelCallbacks(this.channel, { | |
readable: function(listener) { | |
self.readable = true; | |
if (self.read() === -1) { | |
self.emit('end'); | |
} | |
}, | |
writable: function(listener) { | |
listener.deleteCallback('writable', arguments.callee); | |
self.writable = true; | |
self.flush(); | |
} | |
}); | |
}; | |
exports.Stream = Stream; | |
inherit(Stream, EventEmitter); | |
Stream.prototype.end = function() { | |
this.channel.close(); | |
this.channelListener.stop(); | |
}; | |
Stream.prototype.flush = function() { | |
if (this.flushing || !this.flushQueue.length || !this.writable) { | |
return false; | |
} | |
self = this; | |
this.flushing = true; | |
return (function _flush() { | |
var buffer; | |
while (buffer = self.flushQueue.shift()) { | |
if (buffer.remaining() === 0) { | |
continue; // empty buffer | |
} | |
try { | |
self.channel.write(buffer); | |
if (buffer.remaining()) { | |
// couldn't write it all, put back on the queue and wait | |
self.flushQueue.unshift(buffer); | |
self.reactor.nextTick(_flush); | |
return false; | |
} | |
} catch (err) { | |
self.flushing = false; | |
self.emit('error', err); | |
return false; | |
} | |
} | |
self.flushing = false; | |
self.emit('drain'); | |
return true; | |
})(); | |
}; | |
Stream.prototype.read = function() { | |
var bytesRead; | |
this.readBuffer.clear(); | |
try { | |
bytesRead = this.channel.read(this.readBuffer); | |
if (bytesRead > 0) { | |
this.readBuffer.flip(); | |
this.emit('data', this.readBuffer.slice()); | |
} | |
} catch (err) { | |
this.emit('error', err); | |
bytesRead = -1; | |
} | |
return bytesRead; | |
}; | |
// stream.write(buffer); | |
// stream.write("ohai"); | |
// stream.write("ohai", "ascii"); | |
Stream.prototype.write = function(buffer) { | |
if (this.channel === null) { | |
throw new Error('channel is null'); | |
} | |
// Encode the buffer, if needed | |
if (typeof buffer === 'string' || buffer instanceof String) { | |
var encoding = arguments[1] || 'utf8'; | |
var charset = java.nio.charset.Charset.forName(encoding); | |
buffer = charset.encode(buffer); | |
} else if (!(buffer instanceof java.nio.Buffer)) { | |
throw new TypeError('buffer must be java.nio.Buffer or String'); | |
} | |
if (!this.flushing) { | |
this.channel.write(buffer); | |
if (buffer.remaining() === 0) { | |
// Everything written, nothing to queue | |
return true; | |
} | |
} | |
// Queue the remaining data | |
this.flushQueue.push(buffer); | |
return false; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Listener is a dumb listener. It always returns true as long as it's active | |
// and has a callback. It is used for idle callbacks. | |
function Listener(reactor, callback) { | |
this.active = false; | |
this.reactor = reactor; | |
this.callback = callback; | |
}; | |
exports.Listener = Listener; | |
Listener.prototype.invoke = function() { | |
this.callback(this); | |
}; | |
Listener.prototype.pending = function(reactor) { | |
return this.active && this.callback; | |
}; | |
Listener.prototype.start = function() { | |
if (!this.active) { | |
this.active = true; | |
this.reactor.addListener(this); | |
} | |
return this; | |
}; | |
Listener.prototype.stop = function() { | |
if (this.active) { | |
this.active = false; | |
this.reactor.removeListener(this); | |
} | |
return this; | |
}; | |
// DelayedListener is a relative timer (e.g. it triggers after a number of | |
// seconds from now). It stops itself after triggering once. | |
function DelayedListener(reactor, delay, callback) { | |
Listener.call(this, reactor, callback); | |
this.time = reactor.time + delay; | |
this.delay = delay; | |
}; | |
exports.DelayedListener = DelayedListener; | |
inherit(DelayedListener, Listener); | |
DelayedListener.prototype.invoke = function() { | |
Listener.prototype.invoke.call(this); | |
this.stop(); | |
}; | |
DelayedListener.prototype.pending = function() { | |
if (this.reactor.time >= this.time) { | |
return Listener.prototype.pending.call(this); | |
} else { | |
return false; | |
} | |
}; | |
// ScheduledListener is an absolute timer (e.g. it triggers itself when | |
// reactor.time is >= a given time). It stops itself after triggering once. | |
function ScheduledListener(reactor, time, callback) { | |
Listener.call(this, reactor, callback); | |
this.time = time; | |
}; | |
exports.ScheduledListener = ScheduledListener; | |
inherit(ScheduledListener, Listener); | |
// ChannelListener watches for changes in the state on a java.nio Channel. | |
// It triggers when the reactor's selector detects changes in the channel's | |
// state. ChannelListener does not inherit from Listener, but provides a | |
// similar interface. | |
function ChannelListener(reactor, channel, callbacks) { | |
this.active = false; | |
this.channel = channel; | |
this.callbacks = callbacks; | |
this.flags = ChannelListener.getFlagsForListener(this); | |
this.pendingFlags = 0; | |
this.reactor = reactor; | |
}; | |
exports.ChannelListener = ChannelListener; | |
ChannelListener.flagsByKey = { | |
'acceptable': java.nio.channels.SelectionKey.OP_ACCEPT, | |
'connected': java.nio.channels.SelectionKey.OP_CONNECT, | |
'readable': java.nio.channels.SelectionKey.OP_READ, | |
'writable': java.nio.channels.SelectionKey.OP_WRITE | |
}; | |
ChannelListener.getFlagsForListener = function(listener) { | |
var flags = 0; | |
var validFlags = listener.channel.validOps(); | |
var keys = Object.keys(listener.callbacks); | |
for (var i = 0, len = keys.length; i < len; ++i) { | |
var flag = ChannelListener.flagsByKey[keys[i]] || 0; | |
if (validFlags & flag) { | |
flags |= flag; | |
} | |
} | |
return flags; | |
}; | |
// Sets the callback for the given key. This will replace any existing | |
// callbacks for that key. | |
ChannelListener.prototype.callback = function(key, newCallback) { | |
this.callbacks[key] = newCallback; | |
this.flags = ChannelListener.getFlagsForListener(this); | |
if (this.key && this.key.isValid()) { | |
this.key.interestOps(this.flags); | |
} | |
}; | |
// listener.deleteCallback('readable'); | |
// listener.deleteCallback('readable', fn); | |
// | |
// Remove the callback for the given key, if it exists. Removing all the | |
// callbacks will NOT stop a listener -- you must explicitly call stop(). | |
// If a function is passed, the callback will only be deleted if the current | |
// callback matches the passed function. | |
ChannelListener.prototype.deleteCallback = function(key, callback) { | |
var currentCallback = this.callbacks[key]; | |
if (currentCallback && (!callback || currentCallback === callback)) { | |
delete this.callbacks[key]; | |
this.flags = ChannelListener.getFlagsForListener(this); | |
if (this.key && this.key.isValid()) { | |
this.key.interestOps(this.flags); | |
} | |
} | |
}; | |
// Calls all callbacks for pending events. | |
ChannelListener.prototype.invoke = function() { | |
if (!this.pendingFlags) { | |
return; | |
} | |
var keys = Object.keys(this.callbacks); | |
for (var i = 0, len = keys.length; i < len; ++i) { | |
var key = keys[i]; | |
var flag = ChannelListener.flagsByKey[key]; | |
if (this.pendingFlags & flag) { | |
try { | |
if (this.callbacks[key]) { // Might have been removed | |
this.callbacks[key](this); | |
} | |
} finally { | |
this.pendingFlags &= ~flag; | |
} | |
} | |
} | |
}; | |
// Returns true if this listener has any callbacks for the given flags. | |
ChannelListener.prototype.pending = function() { | |
this.pendingFlags |= this.flags & this.key.readyOps(); | |
return this.pendingFlags ? true : false; | |
}; | |
// channel.start(); | |
// Adds the channel to the reactor's listeners and selector. | |
ChannelListener.prototype.start = function() { | |
if (!this.active) { | |
this.key = this.channel.register(this.reactor.selector, this.flags); | |
this.key.attach(this); | |
this.active = true; | |
this.reactor.addListener(this); | |
} | |
return this; | |
}; | |
// channel.stop(); | |
// Removes the channel from the reactor's listeners and selector. | |
ChannelListener.prototype.stop = function() { | |
if (this.active) { | |
if (this.key && this.key.isValid()) { | |
this.key.cancel(); | |
this.key = null; | |
} | |
this.active = false; | |
this.reactor.removeListener(this); | |
} | |
return this; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var events = require('events'); | |
var io = require('io'); | |
function Server(reactor, callback) { | |
events.EventEmitter.call(this); | |
this.channel = null; | |
this.reactor = reactor; | |
if (callback) { | |
this.addListener('connection', callback); | |
} | |
} | |
exports.Server = Server; | |
inherit(Server, events.EventEmitter); | |
Server.prototype.listen = function(port, backlog) { | |
backlog = isNaN(Number(backlog)) ? 5 : backlog; | |
this.channel = java.nio.channels.ServerSocketChannel.open(); | |
this.channel.configureBlocking(false); | |
this.channel.socket().bind(new java.net.InetSocketAddress(port), backlog); | |
var self = this; | |
this.reactor.channelCallbacks(this.channel, { | |
acceptable: function(listener) { | |
var clientChannel = self.channel.accept(); | |
if (clientChannel) { | |
var stream = new Stream(self.reactor, clientChannel); | |
self.emit('connection', stream); | |
} | |
} | |
}); | |
this.emit('listening'); | |
}; | |
exports.createServer = function(reactor, port, backlog) { | |
return new Server(reactor, port, backlog); | |
}; | |
function Stream(reactor, channel) { | |
io.Stream.call(this, reactor, channel); | |
var self = this; | |
this.channelListener.callback('connected', function() { | |
self.listener.deleteCallback('connected', arguments.callee); | |
self.emit('connect'); | |
}); | |
} | |
exports.Stream = Stream; | |
inherit(Stream, io.Stream); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// FIXME: add pause/resume for the event loop | |
// FIXME: linked lists, binary tree for sorting? | |
// FIXME: use a free list | |
// FIXME: time jumps | |
var listeners = require('listeners'); | |
var Reactor = exports.Reactor = function() { | |
this.listeners = { | |
beforeTick: [], | |
nextTick: [], | |
channel: [], | |
scheduled: [], | |
delayed: [], | |
idle: [], | |
afterTick: [] | |
}; | |
this.pending = []; | |
this.running = false; | |
this.selector = java.nio.channels.Selector.open(); | |
this.sleep = true; | |
this.tick = 0; | |
this.tickInterval = 0; | |
this.updateTime(); | |
}; | |
Reactor.prototype.afterTick = function(callback) { | |
var listener = new listeners.Listener(this, callback); | |
listener.group = 'afterTick'; | |
listener.start(); | |
return listener; | |
}; | |
Reactor.prototype.beforeTick = function(callback) { | |
var listener = new listeners.Listener(this, callback); | |
listener.group = 'beforeTick'; | |
listener.start(); | |
return listener; | |
}; | |
Reactor.prototype.nextTick = function(callback) { | |
var listener = new listeners.Listener(this, callback); | |
listener.group = 'nextTick'; | |
listener.start(); | |
return listener; | |
}; | |
// var listener = reactor.channelCallback(channel, { | |
// readable: function(listener) { | |
// print('channel is now readable'); | |
// }, | |
// | |
// writable: function(listener) { | |
// print('channel is now writable'); | |
// } | |
// }); | |
// listener.start(); | |
Reactor.prototype.channelCallbacks = function(channel, callbacks) { | |
return new listeners.ChannelListener(this, channel, callbacks).start(); | |
}; | |
// var listener = reactor.idleCallback(function() { | |
// print('reactor is idle'); | |
// }); | |
// listener.start(); | |
Reactor.prototype.idleCallback = function(callback) { | |
var listener = new listeners.Listener(this, callback); | |
listener.group = 'idle'; | |
listener.start(); | |
return listener; | |
}; | |
// var listener = reactor.delayCallback(5, function() { | |
// print('five seconds have passed'); | |
// }); | |
// listener.start(); | |
Reactor.prototype.delayCallback = function(delay, callback) { | |
return new listeners.DelayedListener(this, delay, callback).start(); | |
}; | |
// var time = new Date(2012, 01, 01).getTime() / 1000; // seconds | |
// var listener = reactor.scheduleCallback(time, function() { | |
// print('ohhhh nnoooooooooo!'); | |
// }); | |
// listener.start(); | |
Reactor.prototype.scheduleCallback = function(time, callback) { | |
return new listeners.ScheduledListener(this, time, callback).start(); | |
}; | |
Reactor.prototype.addListener = function(listener) { | |
var group = this.groupForListener(listener); | |
this.listeners[group].push(listener); | |
return listener; | |
}; | |
Reactor.prototype.hasListeners = function(key) { | |
if (key) { | |
return this.listeners[key].length > 0; | |
} else { | |
var keys = Object.keys(this.listeners); | |
var i = keys.length; | |
while (i--) { | |
if (this.listeners[keys[i]].length > 0) { | |
return true; | |
} | |
} | |
return false; | |
} | |
}; | |
Reactor.prototype.groupForListener = function(listener) { | |
if (listener instanceof listeners.ChannelListener) { | |
return 'channel'; | |
} else if (listener instanceof listeners.ScheduledListener) { | |
return 'scheduled'; | |
} else if (listener instanceof listeners.DelayedListener) { | |
return 'delayed'; | |
} else if (listener instanceof listeners.Listener) { | |
return listener.group; | |
} else { | |
throw new TypeError('invalid listener ' + listener); | |
} | |
}; | |
Reactor.prototype.removeListener = function(listener) { | |
var listeners = this.listeners[this.groupForListener(listener)]; | |
var index = listeners.indexOf(listener); | |
if (index !== -1) { | |
listeners.splice(index, 1); | |
} | |
}; | |
Reactor.prototype.invokePending = function(key) { | |
for (var i = 0, len = this.pending.length; i < len; ++i) { | |
this.pending[i].invoke(); | |
} | |
this.pending.length = 0; | |
}; | |
Reactor.prototype.queuePending = function(listeners, lockOut) { | |
if (lockOut && this.pending.length > 0) { | |
return; | |
} | |
for (var i = 0, len = listeners.length; i < len; ++i) { | |
if (listeners[i].pending()) { | |
this.pending.push(listeners[i]); | |
} | |
} | |
}; | |
Reactor.prototype.queueSelected = function() { | |
var selected = this.selector.selectedKeys(); | |
var iterator = selected.iterator(); | |
while (iterator.hasNext()) { | |
var listener = iterator.next().attachment(); | |
if (listener.pending()) { | |
this.pending.push(listener); | |
} | |
} | |
selected.clear(); | |
}; | |
Reactor.prototype.run = function() { | |
if (this.running) { | |
throw new Error('run() called but looping already true (recursion?)'); | |
} | |
this.running = true; | |
do { | |
this.updateTime(); | |
this.queuePending(this.listeners.beforeTick); | |
this.queuePending(this.listeners.nextTick); | |
this.listeners.nextTick.length = 0; // FIXME: kill listeners too? | |
this.invokePending(); | |
if (!this.running) { | |
break; | |
} | |
if (this.selector.selectNow() > 0) { | |
this.queueSelected(); | |
} | |
if (!this.pending.length && this.sleep && this.tickInterval > 0) { | |
// FIXME: if timer is scheduled in less time, sleep for less | |
java.lang.Thread.sleep(this.tickInterval * 1000); | |
} | |
this.queuePending(this.listeners.scheduled); | |
this.queuePending(this.listeners.delayed); | |
this.queuePending(this.listeners.idle, true); | |
this.queuePending(this.listeners.afterTick); | |
this.invokePending(); | |
this.tick++; | |
} while (this.running && this.hasListeners()); | |
}; | |
Reactor.prototype.stop = function() { | |
this.running = false; | |
}; | |
Reactor.prototype.updateTime = function() { | |
this.time = java.lang.System.currentTimeMillis() / 1000; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require = (function() { | |
var cached = {}; | |
var currentPath = java.lang.System.getProperty('user.dir'); | |
var paths = [currentPath]; | |
function normalize(id) { | |
id = id + '.js'; | |
if (/^\.\.?/.test(id)) { | |
// relative path | |
file = new java.io.File(currentPath, id); | |
if (file.isFile()) { | |
return file; | |
} | |
} else { | |
for (var i = 0, len = paths.length; i < len; ++i) { | |
file = new java.io.File(paths[i], id); | |
if (file.isFile()) { | |
return file; | |
} | |
} | |
} | |
return undefined; | |
}; | |
function readFile(file) { | |
var channel = new java.io.FileInputStream(file).getChannel(); | |
var length = channel.size(); | |
var buffer = java.nio.ByteBuffer.allocate(length + 1); | |
var bytes = java.lang.reflect.Array.newInstance( | |
java.lang.Byte.TYPE, length); | |
do { | |
var bytesRead = channel.read(buffer); | |
} while (bytesRead != -1); | |
buffer.rewind(); | |
buffer.get(bytes, 0, length); | |
return String(new java.lang.String(bytes)); | |
}; | |
function require(id) { | |
var file = normalize(id); | |
if (!file) { | |
throw new Error("couldn't find module \"" + id + "\""); | |
} else { | |
id = file.getCanonicalPath(); | |
} | |
if (!cached.hasOwnProperty(id)) { | |
var source = ( | |
"(function (require, exports, module) { " + | |
readFile(id) + "\n});"); | |
cached[id] = { | |
exports: {}, | |
module: { | |
id: id, | |
uri: String(new java.io.File(id).toURI()) | |
} | |
}; | |
var previousPath = currentPath; | |
try { | |
currentPath = file.getParent(); | |
var ctx = org.mozilla.javascript.Context.getCurrentContext(); | |
var func = ctx.evaluateString({}, source, id, 1, null); | |
func(require, cached[id].exports, cached[id].module); | |
} finally { | |
currentPath = previousPath; | |
} | |
} | |
return cached[id].exports; | |
}; | |
require.paths = paths; | |
return require; | |
})(); | |
// FIXME: where should this really go? | |
function inherit(ctor, superCtor) { | |
var tempCtor = function() {}; | |
tempCtor.prototype = superCtor.prototype; | |
ctor.prototype = new tempCtor(); | |
ctor.prototype.constructor = ctor; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
load('require.js'); | |
var Reactor = require('reactor').Reactor; | |
var net = require('net'); | |
var reactor = new Reactor(); | |
reactor.tickInterval = 0.1; | |
net.createServer(reactor, function(stream) { | |
stream.addListener('connect', function() { | |
stream.write('ohai\r\n'); | |
}); | |
stream.addListener('data', function(data) { | |
stream.write(data); | |
}); | |
stream.addListener('end', function() { | |
stream.write('baibai\r\n'); | |
stream.end(); | |
}); | |
}).listen(8080); | |
(function count() { | |
print(reactor.listeners.channel.length); | |
reactor.delayCallback(5, count).start(); | |
})(); | |
reactor.run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment