Skip to content

Instantly share code, notes, and snippets.

@jaszhix
Last active February 17, 2018 20:55
Show Gist options
  • Save jaszhix/215306816e15b3cf78af6369700ab97b to your computer and use it in GitHub Desktop.
Save jaszhix/215306816e15b3cf78af6369700ab97b to your computer and use it in GitHub Desktop.
Experimental CJS subprocess IPC handling with libsoup
// This populates our imports object with gi modules in the child process.
imports.gi.GIRepository.Repository.get_default();
const Gio = imports.gi.Gio;
const Soup = imports.gi.Soup;
const Signals = imports.signals;
// Cinnamon's JS context does not have an ARGV global defined.
const isChildProcess = typeof ARGV !== 'undefined';
const DEBUG = true;
/**
* Worker
*
* Signals:
*
* "worker-ready": When both IPC end points are connected, this signal will indicate when code and methods can be sent
* to the child process for evaluation.
*
* "worker-killed": Emitted before the child process is terminated.
*
* Example usage:
*
* Starting an instance:
*
* let worker = new Worker(null, this.path);
*
* worker.connect('worker-ready', Lang.bind(this, ()=>{
* worker.send('code', 'print(Math.round(456709 / 92))').then((res)=>{
* print(res); // 4964
* }).catch((e, message)=>{
* print('ERR!');
* print(e, message);
* });
* }));
*
* Importing a file:
*
* let _import = {
* path: metadata.path,
* name: 'MyClass',
* args: [],
* isConstructor: true
* };
*
* worker.send('import', JSON.stringify(_import)).then((res)=>{
* print(res)
* }).catch((e, message)=>{
* print('ERR!')
* print(e, message)
* });
*
* After importing, your specified file will be available in the child's context `this`. If isConstructor is
* passed, it will create a new instance and pass its context to it as the first argument. If you do this, you can
* then create a helper class that assumes it has access to the worker with `this`.
*
* worker.send('code', 'this.MyClass.specialFunction('+JsonSerializableArgument+')').then((res)=>{
* print(res); // Returned result
* }).catch((e, message)=>{
* print('ERR!');
* print(e, message);
* });
*
*/
function Worker() {
this._init.apply(this, arguments);
}
Worker.prototype = {
/**
* _init
*
* @param {array} ports - private, only used by the main thread constructor, null should be passed.
* @param {string} path - the path containing worker.js
*/
_init: function(ports, path) {
this.subprocess = null;
this.path = path;
this.subprocessFile = this.path + '/worker.js';
this.parentServerPort = isChildProcess ? ports[0] : NaN;
this.childServerPort = isChildProcess ? ports[1] : NaN;
this.freePorts = [];
/*this.connect('worker-ready', ()=>{
this.print('test kill...');
this.killWorker();
})*/
if (!isChildProcess) {
this.findUnusedPorts(()=>{
this.ipcServer(()=>{
this.__init();
});
});
} else {
this.print('Worker init', ARGV)
this.__init();
}
},
__init: function(){
if (!isChildProcess && this.path) {
this.eval(this.subprocessFile, null);
} else {
// Set up the IPC server for the child process, then
this.ipcServer(()=>{
this.send('code', 'this.print(\'Worker connected\')').then((res)=>{
}).catch((err)=>{
this.print(err);
});
});
}
},
/**
* This will attempt to find an unused port on the machine, and prefers the highest ports Soup will work with.
*
* @param {function} cb
*/
findUnusedPorts: function(cb){
// Only run in the parent (main thread)
this.eval(null, ['netstat', '-lntu'], (usedPorts)=>{
usedPorts = usedPorts.split('\n');
let intUsedPorts = [];
for (let i = 0, len = usedPorts.length; i < len; i++) {
if (!usedPorts[i]) {
continue;
}
let port = usedPorts[i].trim().match(/([a-zA-Z0-9\-_\.]+):([0-9]{1,5})/gm);
if (port) {
let portParts = port[port.length - 1].split(':')[1];
if (typeof portParts === 'string') {
port = parseInt(portParts);
intUsedPorts.push(port);
}
}
}
// 49151 is the highest port Soup will use.
let port = 49151;
while (port > 1024 && this.freePorts.length < 6) {
if (intUsedPorts.indexOf(port) === -1) {
this.freePorts.push(port)
}
--port;
}
this.parentServerPort = this.freePorts[0];
this.childServerPort = this.freePorts[this.freePorts.length - 1];
this.print('parentServerPort: ', this.parentServerPort)
this.print('childServerPort: ', this.childServerPort)
cb();
});
},
/**
* Starts a new cjs child process, or executes a command.
*
* @param {string} file
* @param {array} proc
* @param {function} cb
*/
eval: function(file, proc, cb){
let exec;
if (proc) {
exec = proc;
} else {
exec = ['cjs', file, '--parentServerPort='+this.parentServerPort, '--childServerPort='+this.childServerPort];
}
let subprocess = new Gio.Subprocess({
argv: exec,
// We need STDIN so our child process has an ARGV context.
flags: Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDIN_PIPE,
});
subprocess.init(null);
subprocess.communicate_utf8_async(null, null, (obj, res)=>{
let [success, out] = obj.communicate_utf8_finish(res);
this.print('out: ', out !== null)
if (typeof cb === 'function') {
cb(!success || out);
}
});
},
/**
* Starts the Soup server for IPC communication.
*
* @param {function} cb
*/
ipcServer: function(cb){
let port = isChildProcess ? this.childServerPort : this.parentServerPort;
let handler = (server, msg, path, query, client)=>{
msg.status_code = 200;
msg.response_headers.set_content_type('application/json', {});
// Need to use GET requests because Soup would not populate the request_body for unknown reasons.
let request = JSON.parse(msg.request_body.data);
this.print('ipcServer request: ', request)
if (!request) {
return false;
} else if (request.code) {
let res = {code: eval(request.code)}
msg.set_response('application/json', Soup.MemoryUse.COPY, JSON.stringify(res));
} else if (request.import) {
try {
request.import = JSON.parse(request.import);
imports.searchPath.unshift(request.import.path);
let _import = imports[request.import.name];
if (request.import.isConstructor) {
let args = [this].concat(request.import.args);
this[request.import.name] = new _import(...args);
} else {
this[request.import.name] = _import;
}
msg.set_response('application/json', Soup.MemoryUse.COPY, '{success: true}');
} catch (e) {
this.print('Error importing file...', e);
msg.response_body.append('{}');
}
} else if (request.method) {
request.method = JSON.parse(request.method);
request.method.name = request.method.name.split('.');
this.print(request.method)
let res = this[request.method.name[0]][request.method.name[1]](...request.method.args);
msg.set_response('application/json', Soup.MemoryUse.COPY, JSON.stringify(res));
}
};
this.server = new Soup.Server();
try {
let success = this.server.listen_local(port, Soup.ServerListenOptions.IPV6_ONLY);
} catch (e) {
this.print('server.listen_local failed: ', e);
}
try {
let uris = this.server.get_uris();
if (typeof uris[0] === 'undefined') {
if (isChildProcess) {
--this.childServerPort;
} else {
--this.parentServerPort;
}
this.ipcServer(cb);
return;
}
// Assign the port to our context just incase Soup decides to listen on a random port for no reason...
let port = uris[0].get_port();
if (isChildProcess) {
this.childServerPort = port;
} else {
this.parentServerPort = port;
}
} catch (e) {
print(e)
}
// Send code to be evaluated back the main thread triggering a ready signal for the callee.
if (isChildProcess) {
this.send('code', 'this.emit(\'worker-ready\');');
}
cb();
this.server.add_handler('/', handler);
this.server.run();
},
/**
* Send code to the other Worker instance to evaluate, or an import.
*
* @param {string} type - options are "code" and "import".
* @param {string} content - for code, this should be JS code in a string, or an import.
*/
send: function(type, content){
let port = isChildProcess ? this.parentServerPort : this.childServerPort;
let url = 'http://ip6-localhost:'+port.toString()+'/'
this.print('send request port: ', port)
this.print('send request URL: ', url)
this.print('send request type: ', type)
this.print('send request content: ', content)
if (typeof content === 'object') {
content = JSON.stringify(content);
}
return new Promise((resolve, reject)=>{
let httpSession = new Soup.SessionAsync();
httpSession.user_agent = 'IPC/API';
let request = Soup.Message.new('POST', url);
request.request_headers.set_content_type('application/json', {});
let reqObject = {};
reqObject[type] = content;
request.set_request('application/json', Soup.MemoryUse.COPY, JSON.stringify(reqObject));
Soup.Session.prototype.add_feature.call(httpSession, new Soup.ProxyResolverDefault());
Soup.Session.prototype.add_feature.call(httpSession, new Soup.ContentDecoder());
httpSession.queue_message(request, (session, message)=>{
this.print('status: ', message.status_code);
try {
let output = JSON.parse(message.response_body.data);
output = output.code ? output.code : output;
resolve(output);
} catch (e) {
resolve(message.response_body.data);
}
});
});
},
/**
* Wrapper around print that allows identifying which worker the logging is coming from.
*
*/
print: function(){
if (!DEBUG) {
return false;
}
let msgAppend = isChildProcess ? 'Child: ' : 'Parent: ';
let args = [msgAppend];
for (let i = 0, len = arguments.length; i < len; i++) {
args.push(JSON.stringify(arguments[i]));
}
print(...args);
},
/**
* Finds the PID this worker's child CJS process is running on by comparing its port in a netstat query, and
* kills it. We need to query the PID this way since Gio.Subprocess doesn't return it.
*
* @param {function} cb
*/
killWorker: function(cb){
if (isChildProcess) {
return false;
}
this.emit('worker-killed');
this.eval(null, ['netstat', '-ap'], (cjsPorts)=>{
cjsPorts = cjsPorts.split('\n');
for (let i = 0, len = cjsPorts.length; i < len; i++) {
if (!cjsPorts[i]) {
continue;
}
let port = cjsPorts[i].trim().match(/([a-zA-Z0-9\-_\.]+):([0-9]{1,5})/gm);
if (port) {
let portParts = port[port.length - 1].split(':')[1];
if (typeof portParts === 'string') {
port = parseInt(portParts);
if ((port === this.parentServerPort || port === this.childServerPort)
&& cjsPorts[i].indexOf('/cjs') !== -1) {
let pid = cjsPorts[i].split('/cjs')[0].split('LISTEN ')[1];
if (!isNaN(parseInt(pid))) {
this.eval(null, ['kill', '-KILL', pid]);
this.print('killed', pid)
}
}
}
}
}
if (typeof cb === 'function') {
cb();
}
});
},
destroy: function(){
this.killWorker(()=>{
this.server.disconnect();
});
}
};
Signals.addSignalMethods(Worker.prototype);
// This handles the invocation of our child process's Worker instance.
if (isChildProcess) {
print('Starting worker...')
print('ARGV: ', JSON.stringify(ARGV))
let parentServerPort = parseInt(ARGV[0].split('--parentServerPort=')[1]);
let childServerPort = parseInt(ARGV[1].split('--childServerPort=')[1]);
print([parentServerPort, childServerPort])
var worker = new Worker([parentServerPort, childServerPort]);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment