Skip to content

Instantly share code, notes, and snippets.

@kmaglione
Created April 22, 2016 17:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmaglione/faf9d7d96d035eff3c25fe6fa6ab7b91 to your computer and use it in GitHub Desktop.
Save kmaglione/faf9d7d96d035eff3c25fe6fa6ab7b91 to your computer and use it in GitHub Desktop.
# HG changeset patch
# User Kris Maglione <maglione.k@gmail.com>
# Date 1461303360 25200
# Thu Apr 21 22:36:00 2016 -0700
# Node ID c3829f5820af4842fbb4f3fc0ed8fb55f3716bd7
# Parent fed6d478bf3d7cc80fbcf975361a2e2bc8e4ed30
[WIP] Add new subprocess IPC module.
diff --git a/dom/system/OSFileConstants.cpp b/dom/system/OSFileConstants.cpp
--- a/dom/system/OSFileConstants.cpp
+++ b/dom/system/OSFileConstants.cpp
@@ -9,22 +9,24 @@
#include "fcntl.h"
#include "errno.h"
#include "prsystem.h"
#if defined(XP_UNIX)
#include "unistd.h"
#include "dirent.h"
+#include "poll.h"
#include "sys/stat.h"
#if defined(ANDROID)
#include <sys/vfs.h>
#define statvfs statfs
#else
#include "sys/statvfs.h"
+#include "sys/wait.h"
#include <spawn.h>
#endif // defined(ANDROID)
#endif // defined(XP_UNIX)
#if defined(XP_LINUX)
#include <linux/fadvise.h>
#endif // defined(XP_LINUX)
@@ -399,16 +401,19 @@ void CleanupOSFileConstants()
* separate categories ("errors", "open", etc.),
* keep properties organized by alphabetical order
* and #ifdef-away properties that are not portable.
*/
static const dom::ConstantSpec gLibcProperties[] =
{
// Arguments for open
INT_CONSTANT(O_APPEND),
+#if defined(O_CLOEXEC)
+ INT_CONSTANT(O_CLOEXEC),
+#endif // defined(O_CLOEXEC)
INT_CONSTANT(O_CREAT),
#if defined(O_DIRECTORY)
INT_CONSTANT(O_DIRECTORY),
#endif // defined(O_DIRECTORY)
#if defined(O_EVTONLY)
INT_CONSTANT(O_EVTONLY),
#endif // defined(O_EVTONLY)
INT_CONSTANT(O_EXCL),
@@ -477,23 +482,34 @@ static const dom::ConstantSpec gLibcProp
INT_CONSTANT(S_IXGRP),
INT_CONSTANT(S_IXUSR),
// seek
INT_CONSTANT(SEEK_CUR),
INT_CONSTANT(SEEK_END),
INT_CONSTANT(SEEK_SET),
- // fcntl command values
#if defined(XP_UNIX)
+ // poll
+ INT_CONSTANT(POLLERR),
+ INT_CONSTANT(POLLHUP),
+ INT_CONSTANT(POLLIN),
+ INT_CONSTANT(POLLNVAL),
+ INT_CONSTANT(POLLOUT),
+
+ // wait
+ INT_CONSTANT(WNOHANG),
+
+ // fcntl command values
INT_CONSTANT(F_GETLK),
+ INT_CONSTANT(F_SETFL),
INT_CONSTANT(F_SETLK),
INT_CONSTANT(F_SETLKW),
- // flock type values
+ // flock type values
INT_CONSTANT(F_RDLCK),
INT_CONSTANT(F_WRLCK),
INT_CONSTANT(F_UNLCK),
#endif // defined(XP_UNIX)
// copyfile
#if defined(COPYFILE_DATA)
INT_CONSTANT(COPYFILE_DATA),
INT_CONSTANT(COPYFILE_EXCL),
diff --git a/toolkit/modules/moz.build b/toolkit/modules/moz.build
--- a/toolkit/modules/moz.build
+++ b/toolkit/modules/moz.build
@@ -77,26 +77,39 @@ EXTRA_JS_MODULES += [
'SessionRecorder.jsm',
'sessionstore/FormData.jsm',
'sessionstore/ScrollPosition.jsm',
'sessionstore/XPathGenerator.jsm',
'ShortcutUtils.jsm',
'Sntp.jsm',
'SpatialNavigation.jsm',
'Sqlite.jsm',
+ 'subprocess/subprocess.jsm',
'Task.jsm',
'Timer.jsm',
'Troubleshoot.jsm',
'UpdateUtils.jsm',
'WebChannel.jsm',
'WindowDraggingUtils.jsm',
'ZipUtils.jsm',
]
EXTRA_JS_MODULES.third_party.jsesc += ['third_party/jsesc/jsesc.js']
+EXTRA_JS_MODULES.subprocess += [
+ 'subprocess/subprocess_common.jsm',
+ 'subprocess/subprocess_shared.js',
+]
+
+if CONFIG['OS_TARGET'] != 'Windows':
+ EXTRA_JS_MODULES.subprocess += [
+ 'subprocess/subprocess_shared_unix.js',
+ 'subprocess/subprocess_unix.jsm',
+ 'subprocess/subprocess_worker_unix.js',
+ ]
+
if CONFIG['MOZ_WIDGET_TOOLKIT'] in ('windows', 'cocoa'):
DEFINES['CAN_DRAW_IN_TITLEBAR'] = 1
if CONFIG['MOZ_WIDGET_TOOLKIT'] in ('windows', 'gtk2', 'gtk3'):
DEFINES['MENUBAR_CAN_AUTOHIDE'] = 1
EXTRA_PP_JS_MODULES += [
'AppConstants.jsm',
diff --git a/toolkit/modules/subprocess/.eslintrc b/toolkit/modules/subprocess/.eslintrc
new file mode 100644
--- /dev/null
+++ b/toolkit/modules/subprocess/.eslintrc
@@ -0,0 +1,24 @@
+{
+ "extends": "../../components/extensions/.eslintrc",
+
+ "env": {
+ "worker": true,
+ },
+
+ "globals": {
+ "ChromeWorker": false,
+ "Components": false,
+ "LIBC": true,
+ "Library": true,
+ "OS": false,
+ "Services": false,
+ "ctypes": false,
+ "dump": false,
+ "libc": true,
+ "unix": true,
+ },
+
+ "rules": {
+ "no-console": 0,
+ },
+}
diff --git a/toolkit/modules/subprocess/subprocess.jsm b/toolkit/modules/subprocess/subprocess.jsm
new file mode 100644
--- /dev/null
+++ b/toolkit/modules/subprocess/subprocess.jsm
@@ -0,0 +1,52 @@
+/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set sts=2 sw=2 et tw=80: */
+
+/*
+ * These modules are loosely based on the subprocess.jsm module created
+ * by Jan Gerber and Patrick Brunschwig, though the implementation
+ * differs drastically.
+ */
+
+"use strict";
+
+let EXPORTED_SYMBOLS = ["subprocess"];
+
+/* exported subprocess */
+
+var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
+
+Cu.import("resource://gre/modules/AppConstants.jsm");
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+
+if (AppConstants.platform == "win") {
+ XPCOMUtils.defineLazyModuleGetter(this, "subprocess_impl",
+ "resource://gre/modules/subprocess/subprocess_win.jsm");
+} else {
+ XPCOMUtils.defineLazyModuleGetter(this, "subprocess_impl",
+ "resource://gre/modules/subprocess/subprocess_unix.jsm");
+}
+
+var subprocess = {
+ call: function(options) {
+ options.mergeStderr = options.mergeStderr || false;
+ options.workdir = options.workdir || null;
+
+ let environment = {};
+ if (!options.environment || options.environmentAppend) {
+ for (let [k, v] of subprocess_impl.getEnvironment()) {
+ environment[k] = v;
+ }
+ }
+
+ if (options.environment) {
+ Object.assign(environment, options.environment);
+ }
+
+ options.environment = Object.keys(environment)
+ .map(key => `${key}=${environment[key]}`);
+
+ options.arguments = Array.slice(options.arguments || []);
+
+ return subprocess_impl(options);
+ },
+};
diff --git a/toolkit/modules/subprocess/subprocess_common.jsm b/toolkit/modules/subprocess/subprocess_common.jsm
new file mode 100644
--- /dev/null
+++ b/toolkit/modules/subprocess/subprocess_common.jsm
@@ -0,0 +1,258 @@
+/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set sts=2 sw=2 et tw=80: */
+"use strict";
+
+/* eslint-disable mozilla/balanced-listeners */
+
+/* exported BaseProcess, PromiseWorker */
+
+var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
+
+var EXPORTED_SYMBOLS = ["BaseProcess", "PromiseWorker"];
+
+Cu.importGlobalProperties(["TextDecoder"]);
+
+let lastResponseId = 0;
+
+class PromiseWorker extends ChromeWorker {
+ constructor(url) {
+ super(url);
+
+ this.listeners = new Map();
+ this.pendingResponses = new Map();
+
+ this.addListener("failure", this.onFailure.bind(this));
+ this.addListener("success", this.onSuccess.bind(this));
+
+ this.addEventListener("message", this.onmessage);
+ }
+
+ addListener(msg, listener) {
+ if (!this.listeners.has(msg)) {
+ this.listeners.set(msg, new Set());
+ }
+ this.listeners.get(msg).add(listener);
+ }
+
+ removeListener(msg, listener) {
+ let listeners = this.listeners.get(msg);
+ if (listeners) {
+ listeners.delete(listener);
+
+ if (!listeners.size) {
+ this.listeners.delete(msg);
+ }
+ }
+ }
+
+ onmessage(event) {
+ let {msg} = event.data;
+ let listeners = this.listeners.get(msg) || new Set();
+
+ for (let listener of listeners) {
+ try {
+ listener(event.data);
+ } catch (e) {
+ Cu.reportError(e);
+ }
+ }
+ }
+
+ onFailure({msgId, error}) {
+ this.pendingResponses.get(msgId).reject(error);
+ this.pendingResponses.delete(msgId);
+ }
+
+ onSuccess({msgId, data}) {
+ this.pendingResponses.get(msgId).resolve(data);
+ this.pendingResponses.delete(msgId);
+ }
+
+ call(method, args, transferList = []) {
+ let msgId = lastResponseId++;
+
+ return new Promise((resolve, reject) => {
+ this.pendingResponses.set(msgId, {resolve, reject});
+
+ let message = {
+ msg: method,
+ msgId,
+ args,
+ };
+
+ this.postMessage(message, transferList);
+ });
+ }
+}
+
+class Pipe {
+ constructor(process, fd) {
+ this.fd = fd;
+ this.processId = process.id;
+ this.worker = process.worker;
+
+ this.closed = false;
+ }
+
+ close() {
+ return this.worker.call("close", [this.processId, this.fd]).then(() => {
+ this.closed = true;
+ });
+ }
+}
+
+class OutputPipe extends Pipe {
+ write(buffer) {
+ let args = [this.processId, this.fd, buffer];
+
+ return this.worker.call("write", args, [buffer]);
+ }
+}
+
+class InputPipe extends Pipe {
+ constructor(...args) {
+ super(...args);
+
+ this.buffers = [];
+ this.bufferSize = 0;
+
+ this.decoder = new TextDecoder();
+
+ this.pendingReads = [];
+
+ this._eventName = `processOutput-${this.processId}`;
+
+ this.worker.addListener(this._eventName, this.onOutput.bind(this));
+ }
+
+ onOutput({buffer}) {
+ if (buffer == null) {
+ this.worker.removeListener(this._eventName, this.onOutput.bind(this));
+ this.closed = true;
+ } else {
+ this.buffers.push(buffer);
+ this.bufferSize += buffer.byteLength;
+ this.checkPendingReads();
+ }
+ }
+
+ checkPendingReads() {
+ let pending = this.pendingReads[0];
+ if (!pending || pending.length > this.bufferSize) {
+ return;
+ }
+
+ this.pendingReads.shift();
+
+ let result;
+ let byteLength = this.buffers[0].byteLength;
+ if (byteLength == pending.length) {
+ result = this.buffers.shift();
+ } else if (byteLength > pending.length) {
+ let buffer = this.buffers[0];
+
+ this.buffers[0] = buffer.slice(pending.length);
+ result = ArrayBuffer.transfer(buffer, pending.length);
+ } else {
+ result = ArrayBuffer.transfer(this.buffers.shift(), pending.length);
+ let u8result = new Uint8Array(result);
+
+ while (byteLength < pending.length) {
+ let buffer = this.buffers[0];
+ let u8buffer = new Uint8Array(buffer);
+
+ let remaining = pending.length - byteLength;
+
+ if (buffer.byteLength <= remaining) {
+ this.buffers.shift();
+
+ u8result.set(u8buffer, byteLength);
+ } else {
+ this.buffers[0] = buffer.slice(remaining);
+
+ u8result.set(u8buffer.subarray(0, remaining), byteLength);
+ }
+
+ byteLength += Math.min(buffer.byteLength, remaining);
+ }
+ }
+
+ this.bufferSize -= result.byteLength;
+ pending.resolve(result);
+ }
+
+ read(length) {
+ return new Promise((resolve, reject) => {
+ this.pendingReads.push({length, resolve, reject});
+ this.checkPendingReads();
+ });
+ }
+
+ readJSON(length) {
+ return this.readString(length).then(string => {
+ return JSON.parse(string);
+ });
+ }
+
+ readString(length, options = null) {
+ return this.read(length).then(buffer => {
+ return this.decoder.decode(buffer, options);
+ });
+ }
+
+ readUint32() {
+ return this.read(4).then(buffer => {
+ return new Uint32Array(buffer)[0];
+ });
+ }
+}
+
+class BaseProcess {
+ constructor(worker, processId, fds, pid) {
+ this.id = processId;
+ this.worker = worker;
+ this.pid = pid;
+
+ this.exitCode = null;
+ this.wait().then(({exitCode}) => {
+ this.exitCode = exitCode;
+ });
+
+ if (fds.has(0)) {
+ this.stdin = new OutputPipe(this, 0);
+ }
+ if (fds.has(1)) {
+ this.stdout = new InputPipe(this, 1);
+ }
+ if (fds.has(2)) {
+ this.stderr = new InputPipe(this, 2);
+ }
+ }
+
+ static create(options) {
+ let {worker} = this;
+
+ return worker.call("spawn", [options]).then(({processId, fds, pid}) => {
+ return new this(worker, processId, fds, pid);
+ });
+ }
+
+ static get WORKER_URL() {
+ throw new Error("Not implemented");
+ }
+
+ static get worker() {
+ if (!this._worker) {
+ this._worker = new PromiseWorker(this.WORKER_URL);
+ }
+ return this._worker;
+ }
+
+ kill(force = false) {
+ return this.worker.call("kill", [this.id, force]);
+ }
+
+ wait() {
+ return this.worker.call("wait", [this.id]);
+ }
+}
diff --git a/toolkit/modules/subprocess/subprocess_shared.js b/toolkit/modules/subprocess/subprocess_shared.js
new file mode 100644
--- /dev/null
+++ b/toolkit/modules/subprocess/subprocess_shared.js
@@ -0,0 +1,59 @@
+/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set sts=2 sw=2 et tw=80: */
+"use strict";
+
+if (!ArrayBuffer.transfer) {
+ ArrayBuffer.transfer = function(buffer, size = undefined) {
+ if (size === undefined) {
+ size = buffer.byteLength;
+ }
+
+ let u8out = new Uint8Array(size);
+ let u8buffer = new Uint8Array(buffer, 0, Math.min(size, buffer.byteLength));
+
+ u8out.set(u8buffer);
+
+ return u8out.buffer;
+ };
+}
+
+var libraries = {};
+
+function Library(name, names, definitions) {
+ if (name in libraries) {
+ return libraries[name];
+ }
+
+ for (let name of names) {
+ try {
+ if (!this.library) {
+ this.library = ctypes.open(name);
+ }
+ } catch (e) {
+ // Ignore errors until we've tried all the options.
+ }
+ }
+ if (!this.library) {
+ throw new Error("Could not load libc");
+ }
+
+ libraries[name] = this;
+
+ for (let symbol of Object.keys(definitions)) {
+ this.declare(symbol, ...definitions[symbol]);
+ }
+}
+
+Library.prototype.declare = function declare(name, ...args) {
+ Object.defineProperty(this, name, {
+ configurable: true,
+ get() {
+ Object.defineProperty(this, name, {
+ configurable: true,
+ value: this.library.declare(name, ...args),
+ });
+
+ return this[name];
+ },
+ });
+};
diff --git a/toolkit/modules/subprocess/subprocess_shared_unix.js b/toolkit/modules/subprocess/subprocess_shared_unix.js
new file mode 100644
--- /dev/null
+++ b/toolkit/modules/subprocess/subprocess_shared_unix.js
@@ -0,0 +1,136 @@
+/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set sts=2 sw=2 et tw=80: */
+"use strict";
+
+/* exported libc */
+
+const LIBC = OS.Constants.libc;
+
+const LIBC_CHOICES = ["libc.so", "libSystem.B.dylib", "a.out"];
+
+const unix = {
+ pid_t: ctypes.int32_t,
+
+ pollfd: new ctypes.StructType("pollfd", [
+ {"fd": ctypes.int},
+ {"events": ctypes.short},
+ {"revents": ctypes.short},
+ ]),
+
+ posix_spawn_file_actions_t: ctypes.uint8_t.array(
+ LIBC.OSFILE_SIZEOF_POSIX_SPAWN_FILE_ACTIONS_T),
+};
+
+var libc = new Library("libc", LIBC_CHOICES, {
+ environ: [ctypes.char.ptr.ptr],
+
+ chdir: [
+ ctypes.default_abi,
+ ctypes.int,
+ ctypes.char.ptr, /* path */
+ ],
+
+ close: [
+ ctypes.default_abi,
+ ctypes.int,
+ ctypes.int, /* fildes */
+ ],
+
+ fcntl: [
+ ctypes.default_abi,
+ ctypes.int,
+ ctypes.int, /* fildes */
+ ctypes.int, /* cmd */
+ ctypes.int, /* ... */
+ ],
+
+ getcwd: [
+ ctypes.default_abi,
+ ctypes.char.ptr,
+ ctypes.char.ptr, /* buf */
+ ctypes.size_t, /* size */
+ ],
+
+ kill: [
+ ctypes.default_abi,
+ ctypes.int,
+ unix.pid_t, /* pid */
+ ctypes.int, /* signal */
+ ],
+
+ pipe: [
+ ctypes.default_abi,
+ ctypes.int,
+ ctypes.int.array(2), /* pipefd */
+ ],
+
+ poll: [
+ ctypes.default_abi,
+ ctypes.int,
+ unix.pollfd.array(), /* fds */
+ ctypes.unsigned_int, /* nfds */
+ ctypes.int, /* timeout */
+ ],
+
+ posix_spawn: [
+ ctypes.default_abi,
+ ctypes.int,
+ unix.pid_t.ptr, /* pid */
+ ctypes.char.ptr, /* path */
+ unix.posix_spawn_file_actions_t.ptr, /* file_actions */
+ ctypes.voidptr_t, /* attrp */
+ ctypes.char.ptr.ptr, /* argv */
+ ctypes.char.ptr.ptr, /* envp */
+ ],
+
+ posix_spawn_file_actions_addclose: [
+ ctypes.default_abi,
+ ctypes.int,
+ unix.posix_spawn_file_actions_t.ptr, /* file_actions */
+ ctypes.int, /* fildes */
+ ],
+
+ posix_spawn_file_actions_adddup2: [
+ ctypes.default_abi,
+ ctypes.int,
+ unix.posix_spawn_file_actions_t.ptr, /* file_actions */
+ ctypes.int, /* fildes */
+ ctypes.int, /* newfildes */
+ ],
+
+ posix_spawn_file_actions_destroy: [
+ ctypes.default_abi,
+ ctypes.int,
+ unix.posix_spawn_file_actions_t.ptr, /* file_actions */
+ ],
+
+ posix_spawn_file_actions_init: [
+ ctypes.default_abi,
+ ctypes.int,
+ unix.posix_spawn_file_actions_t.ptr, /* file_actions */
+ ],
+
+ read: [
+ ctypes.default_abi,
+ ctypes.int,
+ ctypes.int, /* fildes */
+ ctypes.char.ptr, /* buf */
+ ctypes.size_t, /* nbyte */
+ ],
+
+ waitpid: [
+ ctypes.default_abi,
+ unix.pid_t,
+ unix.pid_t, /* pid */
+ ctypes.int.ptr, /* status */
+ ctypes.int, /* options */
+ ],
+
+ write: [
+ ctypes.default_abi,
+ ctypes.size_t,
+ ctypes.int, /* fildes */
+ ctypes.char.ptr, /* buf */
+ ctypes.size_t, /* nbyte */
+ ],
+});
diff --git a/toolkit/modules/subprocess/subprocess_unix.jsm b/toolkit/modules/subprocess/subprocess_unix.jsm
new file mode 100644
--- /dev/null
+++ b/toolkit/modules/subprocess/subprocess_unix.jsm
@@ -0,0 +1,45 @@
+/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set sts=2 sw=2 et tw=80: */
+"use strict";
+
+/* eslint-disable mozilla/balanced-listeners */
+
+/* exported subprocess_impl */
+
+/* globals BaseProcess */
+
+var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
+
+var EXPORTED_SYMBOLS = ["subprocess_impl"];
+
+Cu.import("resource://gre/modules/ctypes.jsm");
+Cu.import("resource://gre/modules/osfile.jsm");
+Cu.import("resource://gre/modules/Services.jsm");
+Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
+
+Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
+Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_unix.js", this);
+
+class Process extends BaseProcess {
+ static get WORKER_URL() {
+ return "resource://gre/modules/subprocess/subprocess_worker_unix.js";
+ }
+}
+
+function subprocess_unix(options) {
+ return Process.create(options);
+}
+
+var subprocess_impl = subprocess_unix;
+
+subprocess_unix.getEnvironment = function* getEnvironment() {
+ for (let envp = libc.environ; !envp.contents.isNull(); envp = envp.increment()) {
+ let str = envp.contents.readString();
+
+ let idx = str.indexOf("=");
+ if (idx >= 0) {
+ yield [str.slice(0, idx),
+ str.slice(idx + 1)];
+ }
+ }
+};
diff --git a/toolkit/modules/subprocess/subprocess_worker_unix.js b/toolkit/modules/subprocess/subprocess_worker_unix.js
new file mode 100644
--- /dev/null
+++ b/toolkit/modules/subprocess/subprocess_worker_unix.js
@@ -0,0 +1,468 @@
+/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set sts=2 sw=2 et tw=80: */
+"use strict";
+
+importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
+ "resource://gre/modules/subprocess/subprocess_shared_unix.js");
+
+const POLL_INTERVAL = 50;
+const POLL_TIMEOUT = 0;
+
+const BUFFER_SIZE = 1024;
+
+let lastProcessId = 0;
+
+class Process {
+ constructor(options) {
+ this.id = lastProcessId++;
+
+ this.exitPromise = new Promise(resolve => {
+ this.resolveExit = resolve;
+ });
+ this.exitPromise.then(() => {
+ // The input file descriptors will be closed after poll
+ // reports that their input buffers are empty. If we close
+ // them now, we may lose output.
+ libc.close(this.fds[0]);
+ });
+
+ this.pid = null;
+ this.fds = [];
+
+ this.stringArrays = [];
+
+ this.spawn(options);
+ }
+
+ cleanup() {
+ for (let pipe of this.fds) {
+ libc.close(pipe);
+ }
+ }
+
+ kill(signal) {
+ libc.kill(this.pid, signal);
+ this.wait();
+ }
+
+ stringArray(strings) {
+ let result = ctypes.char.ptr.array(strings.length + 1)();
+
+ let cstrings = strings.map(str => ctypes.char.array()(str));
+ for (let [i, cstring] of cstrings.entries()) {
+ result[i] = cstring;
+ }
+
+ // Char arrays used in char arg and environment vectors must be
+ // explicitly kept alive in a JS object, or they will be reaped
+ // by the GC if it runs before our process is started.
+ this.stringArrays.push(cstrings);
+
+ return result;
+ }
+
+ initFds(actions, fds) {
+ libc.posix_spawn_file_actions_init(actions);
+
+ for (let [i, fd] of fds.entries()) {
+ libc.posix_spawn_file_actions_adddup2(actions, fd, i);
+ }
+
+ for (let fd of this.fds.values()) {
+ libc.posix_spawn_file_actions_addclose(actions, fd);
+ }
+ }
+
+ initPipes({mergeStderr}) {
+ let our_pipes = [];
+ let their_pipes = [];
+
+ let pipe = input => {
+ let fds = ctypes.int.array(2)();
+
+ let res = libc.pipe(fds);
+ if (res == -1) {
+ throw new Error("Unable to create pipe");
+ }
+
+ if (input) {
+ fds = [fds[1], fds[0]];
+ }
+
+ our_pipes.push(fds[1]);
+ libc.fcntl(fds[1], LIBC.F_SETFL, LIBC.O_NONBLOCK | LIBC.O_CLOEXEC);
+
+ return fds[0];
+ };
+
+ their_pipes[0] = pipe(false);
+ their_pipes[1] = pipe(true);
+
+ if (mergeStderr) {
+ their_pipes[2] = their_pipes[1];
+ } else {
+ their_pipes[2] = pipe(true);
+ }
+
+ this.fds = our_pipes;
+
+ return their_pipes;
+ }
+
+ spawn(options) {
+ let {command, arguments: args} = options;
+
+ let argv = this.stringArray([command, ...args]);
+ let envp = this.stringArray(options.environment);
+
+ let actions = unix.posix_spawn_file_actions_t();
+ let actionsp = actions.address();
+
+ let fds = this.initPipes(options);
+
+ let cwd;
+ try {
+ if (options.workdir) {
+ cwd = ctypes.char.array(LIBC.PATH_MAX)();
+ libc.getcwd(cwd, cwd.length);
+
+ if (libc.chdir(options.workdir) < 0) {
+ throw new Error(`Unable to change working directory to ${options.workdir}`);
+ }
+ }
+
+ this.initFds(actionsp, fds);
+
+ let pid = unix.pid_t(0);
+ let rv = libc.posix_spawn(pid.address(), command, actionsp, null, argv, envp);
+
+ if (rv != 0) {
+ throw new Error(`Failed to execute command "${command}"`);
+ }
+
+ this.pid = pid.value;
+ } catch (e) {
+ this.cleanup();
+
+ throw e;
+ } finally {
+ this.stringArrays.length = 0;
+
+ if (cwd) {
+ libc.chdir(cwd);
+ }
+ for (let pipe of fds) {
+ libc.close(pipe);
+ }
+
+ libc.posix_spawn_file_actions_destroy(actionsp);
+ }
+ }
+
+ wait() {
+ let status = new ctypes.int();
+
+ let res = libc.waitpid(this.pid, status.address(), LIBC.WNOHANG);
+ if (res == this.pid) {
+ this.resolveExit(status.value);
+ return status.value;
+ }
+ }
+}
+
+var io = {
+ pollFds: null,
+
+ readers: new Map(),
+ pendingWrites: new Map(),
+
+ processes: new Map(),
+ fdProcess: new Map(),
+
+ interval: null,
+
+ readFd(fd, send) {
+ let close = () => {
+ send(null);
+ };
+
+ this.readers.set(fd, {send, close});
+ this.updatePollFds();
+ },
+
+ writeFd(fd, buffer) {
+ return new Promise((resolve, reject) => {
+ if (!this.pendingWrites.has(fd)) {
+ this.pendingWrites.set(fd, []);
+ }
+ let writes = this.pendingWrites.get(fd);
+
+ writes.push({resolve, reject, buffer, length: buffer.byteLength});
+ this.updatePollFds();
+ });
+ },
+
+ updatePollFds() {
+ let keys = new Set([...this.readers.keys(),
+ ...this.pendingWrites.keys()]);
+
+ let fds = unix.pollfd.array(keys.size)();
+
+ let i = 0;
+ for (let key of keys) {
+ let fd = fds[i++];
+
+ let events = 0;
+ if (this.readers.has(key)) {
+ events |= LIBC.POLLIN;
+ }
+ if (this.pendingWrites.has(key)) {
+ events |= LIBC.POLLOUT;
+ }
+
+ fd.fd = key;
+ fd.events = events;
+ fd.revents = 0;
+ }
+
+ this.pollFds = fds;
+
+ if (fds.length && !this.interval) {
+ this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
+ } else if (!fds.length && this.interval) {
+ clearInterval(this.interval);
+ this.interval = null;
+ }
+ },
+
+ poll() {
+ let fds = this.pollFds;
+ let count = libc.poll(fds, fds.length, POLL_TIMEOUT);
+
+ for (let i = 0; count && i < fds.length; i++) {
+ let fd = fds[i];
+ if (fd.revents) {
+ count--;
+
+ if (fd.revents & LIBC.POLLIN) {
+ this.readPipe(fd.fd);
+ }
+ if (fd.revents & LIBC.POLLOUT) {
+ this.writePipe(fd.fd);
+ }
+ if (fd.revents & (LIBC.POLLERR | LIBC.POLLHUP | LIBC.POLLNVAL)) {
+ this.killPipe(fd.fd);
+ }
+
+ fd.revents = 0;
+ }
+ }
+ },
+
+ addProcess(process) {
+ this.processes.set(process.id, process);
+
+ for (let fd of process.fds) {
+ this.fdProcess.set(fd, process);
+ }
+ },
+
+ cleanupProcess(process) {
+ for (let fd of process.fds) {
+ this.killPipe(fd);
+ }
+
+ this.processes.delete(process.id);
+ },
+
+ killPipe(fd) {
+ if (this.readers.has(fd)) {
+ this.readers.get(fd).close();
+ }
+ this.readers.delete(fd);
+
+ if (this.pendingWrites.has(fd)) {
+ for (let {reject, buffer} of this.pendingWrites.get(fd)) {
+ reject({bytesRemaing: buffer.byteLength});
+ }
+ this.pendingWrites.delete(fd);
+ }
+
+ libc.close(fd);
+
+ let process = this.fdProcess.get(fd);
+ if (process) {
+ process.wait();
+ }
+
+ this.fdProcess.delete(fd);
+ this.updatePollFds();
+ },
+
+ readPipe(fd) {
+ for (;;) {
+ let buffer = this.read(fd);
+ if (!buffer) {
+ break;
+ }
+ this.readers.get(fd).send(buffer);
+ }
+ },
+
+ writePipe(fd) {
+ let writes = this.pendingWrites.get(fd);
+ while (writes.length) {
+ let {buffer, resolve, length} = writes[0];
+
+ let written = this.write(fd, buffer);
+
+ if (written == buffer.byteLength) {
+ resolve(length);
+ writes.shift();
+ } else if (written > 0) {
+ writes[0].buffer = buffer.slice(written);
+ } else {
+ break;
+ }
+ }
+
+ if (writes.length == 0) {
+ this.pendingWrites.delete(fd);
+ this.updatePollFds();
+ }
+ },
+
+ read(fd, count = BUFFER_SIZE) {
+ let buffer = new ArrayBuffer(count);
+
+ let written = libc.read(fd, buffer, buffer.byteLength);
+ if (written < 0 && ctypes.errno != LIBC.EAGAIN) {
+ this.killPipe(fd);
+ }
+
+ if (written <= 0) {
+ return null;
+ }
+
+ if (written < buffer.byteLength) {
+ return ArrayBuffer.transfer(buffer, written);
+ }
+
+ return buffer;
+ },
+
+ write(fd, buffer) {
+ let bytesWritten = libc.write(fd, buffer, buffer.byteLength);
+
+ if (bytesWritten < 0 && ctypes.errno != LIBC.EAGAIN) {
+ this.killPipe(fd);
+ }
+
+ return bytesWritten;
+ },
+};
+
+let requests = {
+ close(processId, fd) {
+ let process = io.processes.get(processId);
+
+ io.killPipe(process.fds[fd]);
+
+ return {data: {}};
+ },
+
+ spawn(options) {
+ let process = new Process(options);
+ let processId = process.id;
+
+ io.addProcess(process);
+
+ for (let fd of [1, 2]) {
+ if (fd >= process.fds.length) {
+ continue;
+ }
+
+ io.readFd(process.fds[fd], buffer => {
+ let message = {
+ msg: `processOutput-${processId}`,
+ processId,
+ fd,
+ buffer,
+ };
+
+ if (buffer) {
+ self.postMessage(message, [buffer]);
+ } else {
+ self.postMessage(message);
+ }
+ });
+ }
+
+ let fds = new Set(process.fds.keys());
+ return {data: {processId, fds, pid: process.pid}};
+ },
+
+ kill(processId, force = false) {
+ let process = io.processes.get(processId);
+
+ process.kill(force ? 9 : 15);
+ },
+
+ wait(processId) {
+ let process = io.processes.get(processId);
+
+ process.wait();
+
+ return process.exitPromise.then(exitCode => {
+ return {data: {exitCode}};
+ });
+ },
+
+ write(processId, fd, buffer) {
+ let process = io.processes.get(processId);
+
+ return io.writeFd(process.fds[fd], buffer).then(bytesWritten => {
+ return {data: {bytesWritten}};
+ });
+ },
+};
+
+onmessage = event => {
+ let {msg, msgId, args} = event.data;
+
+ new Promise(resolve => {
+ resolve(requests[msg](...args));
+ }).then(result => {
+ let response = {
+ msg: "success",
+ msgId,
+ data: result.data,
+ };
+
+ self.postMessage(response, result.transfer || []);
+ }).catch(error => {
+ if (error instanceof Error) {
+ error = {
+ message: error.message,
+ fileName: error.fileName,
+ lineNumber: error.lineNumber,
+ column: error.column,
+ stack: error.stack,
+ };
+ }
+
+ self.postMessage({
+ msg: "failure",
+ msgId,
+ error,
+ });
+ }).catch(error => {
+ console.error(error);
+
+ self.postMessage({
+ msg: "failure",
+ msgId,
+ error: {},
+ });
+ });
+};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment