Skip to content

Instantly share code, notes, and snippets.

@noonat
Created April 16, 2010 05:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save noonat/368053 to your computer and use it in GitHub Desktop.
Save noonat/368053 to your computer and use it in GitHub Desktop.
Rhino event loop
function EventEmitter() {
this._listeners = {};
}
exports.EventEmitter = EventEmitter;
EventEmitter.prototype.listeners = function(event) {
if (!this._listeners[event]) {
this._listeners[event] = [];
}
return this._listeners[event];
};
EventEmitter.prototype.addListener = function(event, listener) {
if (!this._listeners[event]) {
this._listeners[event] = [];
}
this._listeners[event].push(listener);
return listener;
};
EventEmitter.prototype.removeListener = function(event, listener) {
var listeners = this._listeners[event];
if (listeners) {
var index = listeners.indexOf(listener);
if (index !== -1) {
listeners.splice(index, 1);
}
}
};
EventEmitter.prototype.removeAllListeners = function(event) {
if (this._listeners[event]) {
delete this._listeners[event];
}
};
EventEmitter.prototype.emit = function(event) {
var listeners = this._listeners[event];
if (listeners && listeners.length > 0) {
var args = Array.prototype.slice.call(arguments, 1);
for (var i = 0, len = listeners.length; i < len; ++i) {
listeners[i].apply(null, args);
}
}
};
var EventEmitter = require('events').EventEmitter;
function Stream(reactor, channel) {
EventEmitter.call(this);
this.channel = channel;
this.channel.configureBlocking(false);
this.reactor = reactor;
this.readable = false;
this.readBuffer = java.nio.ByteBuffer.allocate(1024);
this.writable = false;
this.writeBuffer = java.nio.ByteBuffer.allocate(1024);
this.flushing = false;
this.flushQueue = [];
var self = this;
this.channelListener = this.reactor.channelCallbacks(this.channel, {
readable: function(listener) {
self.readable = true;
if (self.read() === -1) {
self.emit('end');
}
},
writable: function(listener) {
listener.deleteCallback('writable', arguments.callee);
self.writable = true;
self.flush();
}
});
};
exports.Stream = Stream;
inherit(Stream, EventEmitter);
Stream.prototype.end = function() {
this.channel.close();
this.channelListener.stop();
};
Stream.prototype.flush = function() {
if (this.flushing || !this.flushQueue.length || !this.writable) {
return false;
}
self = this;
this.flushing = true;
return (function _flush() {
var buffer;
while (buffer = self.flushQueue.shift()) {
if (buffer.remaining() === 0) {
continue; // empty buffer
}
try {
self.channel.write(buffer);
if (buffer.remaining()) {
// couldn't write it all, put back on the queue and wait
self.flushQueue.unshift(buffer);
self.reactor.nextTick(_flush);
return false;
}
} catch (err) {
self.flushing = false;
self.emit('error', err);
return false;
}
}
self.flushing = false;
self.emit('drain');
return true;
})();
};
Stream.prototype.read = function() {
var bytesRead;
this.readBuffer.clear();
try {
bytesRead = this.channel.read(this.readBuffer);
if (bytesRead > 0) {
this.readBuffer.flip();
this.emit('data', this.readBuffer.slice());
}
} catch (err) {
this.emit('error', err);
bytesRead = -1;
}
return bytesRead;
};
// stream.write(buffer);
// stream.write("ohai");
// stream.write("ohai", "ascii");
Stream.prototype.write = function(buffer) {
if (this.channel === null) {
throw new Error('channel is null');
}
// Encode the buffer, if needed
if (typeof buffer === 'string' || buffer instanceof String) {
var encoding = arguments[1] || 'utf8';
var charset = java.nio.charset.Charset.forName(encoding);
buffer = charset.encode(buffer);
} else if (!(buffer instanceof java.nio.Buffer)) {
throw new TypeError('buffer must be java.nio.Buffer or String');
}
if (!this.flushing) {
this.channel.write(buffer);
if (buffer.remaining() === 0) {
// Everything written, nothing to queue
return true;
}
}
// Queue the remaining data
this.flushQueue.push(buffer);
return false;
};
// Listener is a dumb listener. It always returns true as long as it's active
// and has a callback. It is used for idle callbacks.
function Listener(reactor, callback) {
this.active = false;
this.reactor = reactor;
this.callback = callback;
};
exports.Listener = Listener;
Listener.prototype.invoke = function() {
this.callback(this);
};
Listener.prototype.pending = function(reactor) {
return this.active && this.callback;
};
Listener.prototype.start = function() {
if (!this.active) {
this.active = true;
this.reactor.addListener(this);
}
return this;
};
Listener.prototype.stop = function() {
if (this.active) {
this.active = false;
this.reactor.removeListener(this);
}
return this;
};
// DelayedListener is a relative timer (e.g. it triggers after a number of
// seconds from now). It stops itself after triggering once.
function DelayedListener(reactor, delay, callback) {
Listener.call(this, reactor, callback);
this.time = reactor.time + delay;
this.delay = delay;
};
exports.DelayedListener = DelayedListener;
inherit(DelayedListener, Listener);
DelayedListener.prototype.invoke = function() {
Listener.prototype.invoke.call(this);
this.stop();
};
DelayedListener.prototype.pending = function() {
if (this.reactor.time >= this.time) {
return Listener.prototype.pending.call(this);
} else {
return false;
}
};
// ScheduledListener is an absolute timer (e.g. it triggers itself when
// reactor.time is >= a given time). It stops itself after triggering once.
function ScheduledListener(reactor, time, callback) {
Listener.call(this, reactor, callback);
this.time = time;
};
exports.ScheduledListener = ScheduledListener;
inherit(ScheduledListener, Listener);
// ChannelListener watches for changes in the state on a java.nio Channel.
// It triggers when the reactor's selector detects changes in the channel's
// state. ChannelListener does not inherit from Listener, but provides a
// similar interface.
function ChannelListener(reactor, channel, callbacks) {
this.active = false;
this.channel = channel;
this.callbacks = callbacks;
this.flags = ChannelListener.getFlagsForListener(this);
this.pendingFlags = 0;
this.reactor = reactor;
};
exports.ChannelListener = ChannelListener;
ChannelListener.flagsByKey = {
'acceptable': java.nio.channels.SelectionKey.OP_ACCEPT,
'connected': java.nio.channels.SelectionKey.OP_CONNECT,
'readable': java.nio.channels.SelectionKey.OP_READ,
'writable': java.nio.channels.SelectionKey.OP_WRITE
};
ChannelListener.getFlagsForListener = function(listener) {
var flags = 0;
var validFlags = listener.channel.validOps();
var keys = Object.keys(listener.callbacks);
for (var i = 0, len = keys.length; i < len; ++i) {
var flag = ChannelListener.flagsByKey[keys[i]] || 0;
if (validFlags & flag) {
flags |= flag;
}
}
return flags;
};
// Sets the callback for the given key. This will replace any existing
// callbacks for that key.
ChannelListener.prototype.callback = function(key, newCallback) {
this.callbacks[key] = newCallback;
this.flags = ChannelListener.getFlagsForListener(this);
if (this.key && this.key.isValid()) {
this.key.interestOps(this.flags);
}
};
// listener.deleteCallback('readable');
// listener.deleteCallback('readable', fn);
//
// Remove the callback for the given key, if it exists. Removing all the
// callbacks will NOT stop a listener -- you must explicitly call stop().
// If a function is passed, the callback will only be deleted if the current
// callback matches the passed function.
ChannelListener.prototype.deleteCallback = function(key, callback) {
var currentCallback = this.callbacks[key];
if (currentCallback && (!callback || currentCallback === callback)) {
delete this.callbacks[key];
this.flags = ChannelListener.getFlagsForListener(this);
if (this.key && this.key.isValid()) {
this.key.interestOps(this.flags);
}
}
};
// Calls all callbacks for pending events.
ChannelListener.prototype.invoke = function() {
if (!this.pendingFlags) {
return;
}
var keys = Object.keys(this.callbacks);
for (var i = 0, len = keys.length; i < len; ++i) {
var key = keys[i];
var flag = ChannelListener.flagsByKey[key];
if (this.pendingFlags & flag) {
try {
if (this.callbacks[key]) { // Might have been removed
this.callbacks[key](this);
}
} finally {
this.pendingFlags &= ~flag;
}
}
}
};
// Returns true if this listener has any callbacks for the given flags.
ChannelListener.prototype.pending = function() {
this.pendingFlags |= this.flags & this.key.readyOps();
return this.pendingFlags ? true : false;
};
// channel.start();
// Adds the channel to the reactor's listeners and selector.
ChannelListener.prototype.start = function() {
if (!this.active) {
this.key = this.channel.register(this.reactor.selector, this.flags);
this.key.attach(this);
this.active = true;
this.reactor.addListener(this);
}
return this;
};
// channel.stop();
// Removes the channel from the reactor's listeners and selector.
ChannelListener.prototype.stop = function() {
if (this.active) {
if (this.key && this.key.isValid()) {
this.key.cancel();
this.key = null;
}
this.active = false;
this.reactor.removeListener(this);
}
return this;
};
var events = require('events');
var io = require('io');
function Server(reactor, callback) {
events.EventEmitter.call(this);
this.channel = null;
this.reactor = reactor;
if (callback) {
this.addListener('connection', callback);
}
}
exports.Server = Server;
inherit(Server, events.EventEmitter);
Server.prototype.listen = function(port, backlog) {
backlog = isNaN(Number(backlog)) ? 5 : backlog;
this.channel = java.nio.channels.ServerSocketChannel.open();
this.channel.configureBlocking(false);
this.channel.socket().bind(new java.net.InetSocketAddress(port), backlog);
var self = this;
this.reactor.channelCallbacks(this.channel, {
acceptable: function(listener) {
var clientChannel = self.channel.accept();
if (clientChannel) {
var stream = new Stream(self.reactor, clientChannel);
self.emit('connection', stream);
}
}
});
this.emit('listening');
};
exports.createServer = function(reactor, port, backlog) {
return new Server(reactor, port, backlog);
};
function Stream(reactor, channel) {
io.Stream.call(this, reactor, channel);
var self = this;
this.channelListener.callback('connected', function() {
self.listener.deleteCallback('connected', arguments.callee);
self.emit('connect');
});
}
exports.Stream = Stream;
inherit(Stream, io.Stream);
// FIXME: add pause/resume for the event loop
// FIXME: linked lists, binary tree for sorting?
// FIXME: use a free list
// FIXME: time jumps
var listeners = require('listeners');
var Reactor = exports.Reactor = function() {
this.listeners = {
beforeTick: [],
nextTick: [],
channel: [],
scheduled: [],
delayed: [],
idle: [],
afterTick: []
};
this.pending = [];
this.running = false;
this.selector = java.nio.channels.Selector.open();
this.sleep = true;
this.tick = 0;
this.tickInterval = 0;
this.updateTime();
};
Reactor.prototype.afterTick = function(callback) {
var listener = new listeners.Listener(this, callback);
listener.group = 'afterTick';
listener.start();
return listener;
};
Reactor.prototype.beforeTick = function(callback) {
var listener = new listeners.Listener(this, callback);
listener.group = 'beforeTick';
listener.start();
return listener;
};
Reactor.prototype.nextTick = function(callback) {
var listener = new listeners.Listener(this, callback);
listener.group = 'nextTick';
listener.start();
return listener;
};
// var listener = reactor.channelCallback(channel, {
// readable: function(listener) {
// print('channel is now readable');
// },
//
// writable: function(listener) {
// print('channel is now writable');
// }
// });
// listener.start();
Reactor.prototype.channelCallbacks = function(channel, callbacks) {
return new listeners.ChannelListener(this, channel, callbacks).start();
};
// var listener = reactor.idleCallback(function() {
// print('reactor is idle');
// });
// listener.start();
Reactor.prototype.idleCallback = function(callback) {
var listener = new listeners.Listener(this, callback);
listener.group = 'idle';
listener.start();
return listener;
};
// var listener = reactor.delayCallback(5, function() {
// print('five seconds have passed');
// });
// listener.start();
Reactor.prototype.delayCallback = function(delay, callback) {
return new listeners.DelayedListener(this, delay, callback).start();
};
// var time = new Date(2012, 01, 01).getTime() / 1000; // seconds
// var listener = reactor.scheduleCallback(time, function() {
// print('ohhhh nnoooooooooo!');
// });
// listener.start();
Reactor.prototype.scheduleCallback = function(time, callback) {
return new listeners.ScheduledListener(this, time, callback).start();
};
Reactor.prototype.addListener = function(listener) {
var group = this.groupForListener(listener);
this.listeners[group].push(listener);
return listener;
};
Reactor.prototype.hasListeners = function(key) {
if (key) {
return this.listeners[key].length > 0;
} else {
var keys = Object.keys(this.listeners);
var i = keys.length;
while (i--) {
if (this.listeners[keys[i]].length > 0) {
return true;
}
}
return false;
}
};
Reactor.prototype.groupForListener = function(listener) {
if (listener instanceof listeners.ChannelListener) {
return 'channel';
} else if (listener instanceof listeners.ScheduledListener) {
return 'scheduled';
} else if (listener instanceof listeners.DelayedListener) {
return 'delayed';
} else if (listener instanceof listeners.Listener) {
return listener.group;
} else {
throw new TypeError('invalid listener ' + listener);
}
};
Reactor.prototype.removeListener = function(listener) {
var listeners = this.listeners[this.groupForListener(listener)];
var index = listeners.indexOf(listener);
if (index !== -1) {
listeners.splice(index, 1);
}
};
Reactor.prototype.invokePending = function(key) {
for (var i = 0, len = this.pending.length; i < len; ++i) {
this.pending[i].invoke();
}
this.pending.length = 0;
};
Reactor.prototype.queuePending = function(listeners, lockOut) {
if (lockOut && this.pending.length > 0) {
return;
}
for (var i = 0, len = listeners.length; i < len; ++i) {
if (listeners[i].pending()) {
this.pending.push(listeners[i]);
}
}
};
Reactor.prototype.queueSelected = function() {
var selected = this.selector.selectedKeys();
var iterator = selected.iterator();
while (iterator.hasNext()) {
var listener = iterator.next().attachment();
if (listener.pending()) {
this.pending.push(listener);
}
}
selected.clear();
};
Reactor.prototype.run = function() {
if (this.running) {
throw new Error('run() called but looping already true (recursion?)');
}
this.running = true;
do {
this.updateTime();
this.queuePending(this.listeners.beforeTick);
this.queuePending(this.listeners.nextTick);
this.listeners.nextTick.length = 0; // FIXME: kill listeners too?
this.invokePending();
if (!this.running) {
break;
}
if (this.selector.selectNow() > 0) {
this.queueSelected();
}
if (!this.pending.length && this.sleep && this.tickInterval > 0) {
// FIXME: if timer is scheduled in less time, sleep for less
java.lang.Thread.sleep(this.tickInterval * 1000);
}
this.queuePending(this.listeners.scheduled);
this.queuePending(this.listeners.delayed);
this.queuePending(this.listeners.idle, true);
this.queuePending(this.listeners.afterTick);
this.invokePending();
this.tick++;
} while (this.running && this.hasListeners());
};
Reactor.prototype.stop = function() {
this.running = false;
};
Reactor.prototype.updateTime = function() {
this.time = java.lang.System.currentTimeMillis() / 1000;
};
require = (function() {
var cached = {};
var currentPath = java.lang.System.getProperty('user.dir');
var paths = [currentPath];
function normalize(id) {
id = id + '.js';
if (/^\.\.?/.test(id)) {
// relative path
file = new java.io.File(currentPath, id);
if (file.isFile()) {
return file;
}
} else {
for (var i = 0, len = paths.length; i < len; ++i) {
file = new java.io.File(paths[i], id);
if (file.isFile()) {
return file;
}
}
}
return undefined;
};
function readFile(file) {
var channel = new java.io.FileInputStream(file).getChannel();
var length = channel.size();
var buffer = java.nio.ByteBuffer.allocate(length + 1);
var bytes = java.lang.reflect.Array.newInstance(
java.lang.Byte.TYPE, length);
do {
var bytesRead = channel.read(buffer);
} while (bytesRead != -1);
buffer.rewind();
buffer.get(bytes, 0, length);
return String(new java.lang.String(bytes));
};
function require(id) {
var file = normalize(id);
if (!file) {
throw new Error("couldn't find module \"" + id + "\"");
} else {
id = file.getCanonicalPath();
}
if (!cached.hasOwnProperty(id)) {
var source = (
"(function (require, exports, module) { " +
readFile(id) + "\n});");
cached[id] = {
exports: {},
module: {
id: id,
uri: String(new java.io.File(id).toURI())
}
};
var previousPath = currentPath;
try {
currentPath = file.getParent();
var ctx = org.mozilla.javascript.Context.getCurrentContext();
var func = ctx.evaluateString({}, source, id, 1, null);
func(require, cached[id].exports, cached[id].module);
} finally {
currentPath = previousPath;
}
}
return cached[id].exports;
};
require.paths = paths;
return require;
})();
// FIXME: where should this really go?
function inherit(ctor, superCtor) {
var tempCtor = function() {};
tempCtor.prototype = superCtor.prototype;
ctor.prototype = new tempCtor();
ctor.prototype.constructor = ctor;
}
load('require.js');
var Reactor = require('reactor').Reactor;
var net = require('net');
var reactor = new Reactor();
reactor.tickInterval = 0.1;
net.createServer(reactor, function(stream) {
stream.addListener('connect', function() {
stream.write('ohai\r\n');
});
stream.addListener('data', function(data) {
stream.write(data);
});
stream.addListener('end', function() {
stream.write('baibai\r\n');
stream.end();
});
}).listen(8080);
(function count() {
print(reactor.listeners.channel.length);
reactor.delayCallback(5, count).start();
})();
reactor.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment