|
const Child = require('child_process') |
|
const EventEmitter = require('eventemitter3') |
|
const Errio = require('errio') |
|
const Util = require('util') |
|
|
|
const EXIT_TIMEOUT = 10000 |
|
const EXIT_CHECK_INTERVAL = 200 |
|
|
|
const IPC_INIT_KEY = '@init' |
|
const IPC_SETTER_KEY = '@setIpc' |
|
const IPC_PROP_KEY = '@ipc' |
|
|
|
const SEND_HANDLES = [ |
|
require('net').Socket, |
|
require('net').Server, |
|
process.binding('pipe_wrap').Pipe, |
|
process.binding('tcp_wrap').TCP, |
|
process.binding('udp_wrap').UDP, |
|
require('dgram').Socket |
|
] |
|
|
|
function callableFork (api, ...args) { |
|
const fork = (path, argv, opts) => { |
|
return MasterIPCEE.create(path, argv, opts) |
|
} |
|
|
|
let modulePath |
|
|
|
if (typeof api === 'string') { |
|
const {resolve: resolvePath, dirname} = require('path') |
|
modulePath = resolvePath(dirname(require('caller')()), api) |
|
return fork.apply(null, [require.resolve(modulePath)].concat(args)) |
|
} |
|
|
|
if (api instanceof module.constructor) { |
|
modulePath = api.filename |
|
api = api.exports = args[0] || api.exports |
|
} |
|
|
|
if (!modulePath) { |
|
modulePath = require('caller')() |
|
} |
|
|
|
api.fork = fork.bind(null, modulePath) |
|
|
|
return api |
|
} |
|
|
|
class AbstractIPCEE extends EventEmitter { |
|
constructor (args) { |
|
super() |
|
|
|
Object.keys(args) |
|
.forEach((key) => (this['_' + key] = args[key])) |
|
|
|
this._debugger = getDebugger(this._opts.debug, this.constructor) |
|
this._init = promiseDefer.call(this) |
|
this._lastReq = -1 |
|
this._requests = {} |
|
|
|
this._loadCallableApi() |
|
} |
|
|
|
get debug () { |
|
return this._debugger |
|
} |
|
|
|
_bootstrap () { |
|
this.debug('bootstrap') |
|
|
|
const grabMessages = (args) => { |
|
args = Array.isArray(args) ? args : [].slice.call(arguments) |
|
args = args.map(tryParseError) |
|
if (args[0].startsWith(this._client.pid)) { |
|
this.debug('msg', args) |
|
args[0] = args[0].substring(args[0].indexOf(':') + 1) |
|
this.emit.apply(this, args) |
|
} else { |
|
this.debug('msg IGNORED', args) |
|
} |
|
} |
|
|
|
const stopGrabbing = () => { |
|
this.emit('disconnect') |
|
Object.keys(this._requests).forEach((reqId) => { |
|
const req = this._requests[reqId] |
|
if (typeof req === 'object') { |
|
// TODO |
|
// this.debug('EXIT REQUEST', reqId) |
|
} |
|
}) |
|
} |
|
|
|
this._client.on('message', grabMessages) |
|
this._client.once('disconnect', stopGrabbing) |
|
this._client.once('exit', (code) => this.emit('exit', code)) |
|
|
|
this.once('initialized', () => this.debug.info('initialized')) |
|
this._init.polygoat((error) => !error && this.emit('initialized')) |
|
|
|
this._init.polygoat((error, result) => { |
|
if (!error) { |
|
this.on('req', (reqId, ...args) => { |
|
const req = promiseDefer.call(this) |
|
req.id = reqId |
|
this._handleRequest.apply(this, [req].concat(args)) |
|
req.polygoat((error, result) => { |
|
this.send('res-' + reqId, error, result) |
|
}) |
|
}) |
|
} |
|
}) |
|
} |
|
|
|
_dialog (fn, ...args) { |
|
const reqId = `${process.pid}-${++this._lastReq}` |
|
const req = this._requests[reqId] = promiseDefer.call(this) |
|
const callback = typeof args.slice(-1)[0] === 'function' && args.pop() |
|
|
|
req.id = reqId |
|
|
|
this.once('res-' + reqId, (error, result) => { |
|
this.debug.response('req-' + reqId, error, result) |
|
this._requests[reqId] = !error |
|
if (error) { |
|
req.reject(error) |
|
} else { |
|
req.resolve(result) |
|
} |
|
}) |
|
|
|
this.send.apply(this, ['req', reqId, fn].concat(args)) |
|
|
|
return req.polygoat(callback) |
|
} |
|
|
|
_handleRequest (req, action, ...args) { |
|
if (action === 'ping') { |
|
return req.resolve(process.pid) |
|
} |
|
req.reject(new Error('no handler defined')) |
|
} |
|
|
|
_loadCallableApi () { |
|
// delete require.cache[this._modulePath] |
|
const ModuleExports = require(this._modulePath) |
|
let api |
|
|
|
if (typeof ModuleExports === 'function') { |
|
api = new ModuleExports() |
|
} else { |
|
api = Object.create(ModuleExports) |
|
} |
|
|
|
return (this._api = api) |
|
} |
|
|
|
_setClient (client) { |
|
this._client = client |
|
this._bootstrap() |
|
} |
|
|
|
_send (...args) { |
|
args[0] = `${this._client.pid}:${args[0]}` |
|
|
|
const callback = typeof args.slice(-1)[0] === 'function' && args.pop() |
|
let sendArgs |
|
|
|
if (SEND_HANDLES.find((sendHanle) => args[1] instanceof sendHanle)) { |
|
sendArgs = [args[0], args[1]] |
|
} else { |
|
sendArgs = [args.map(tryStringifyError)] |
|
} |
|
|
|
if (callback) { |
|
sendArgs.push(callback) |
|
} |
|
|
|
this.debug('send', args) |
|
this._client.send.apply(this._client, sendArgs) |
|
|
|
return this |
|
} |
|
|
|
ping (callback) { |
|
const startTime = (new Date()).getTime() |
|
const pong = Promise.resolve(this._client.connected) |
|
.then((connected) => { |
|
if (!connected) { |
|
return -1 |
|
} |
|
return this._dialog('ping') |
|
}) |
|
.then((pid) => { |
|
const endTime = (new Date()).getTime() |
|
const time = endTime - startTime |
|
const alive = pid > -1 |
|
return {pid, alive, startTime, endTime, time} |
|
}) |
|
|
|
return polygoat.call(this, pong, callback) |
|
} |
|
|
|
ready (callback) { |
|
return this._init.polygoat(callback) |
|
} |
|
|
|
send (...args) { |
|
return Promise.resolve(this._init.promise) |
|
.then(() => this._send.apply(this, args)) |
|
} |
|
|
|
toJSON () { |
|
return this.toObject() |
|
} |
|
|
|
toObject () { |
|
return { |
|
modulePath: this._modulePath, |
|
argv: this._argv, |
|
opts: this._opts |
|
} |
|
} |
|
|
|
when (event, listener) { |
|
if (listener) { |
|
this.once(event, listener) |
|
return this |
|
} |
|
return new Promise((resolve) => { |
|
this.once(event, (...args) => resolve(args)) |
|
}) |
|
} |
|
} |
|
|
|
class ChildIPCEE extends AbstractIPCEE { |
|
static create () { |
|
return new this(process) |
|
} |
|
|
|
constructor (client) { |
|
super(JSON.parse(client.argv.pop())) |
|
this.debug(this.constructor.name, this.toObject()) |
|
Errio.setDefaults(this._opts.errio) |
|
this._argv.forEach((value) => process.argv.push(value)) |
|
this._setClient(client) |
|
} |
|
|
|
_bootstrap () { |
|
super._bootstrap() |
|
this._init.polygoat((error, result) => { |
|
this.debug.response('initialization', error, result) |
|
this._send('init', error, result) |
|
}) |
|
} |
|
|
|
_handleRequest (req, action, ...args) { |
|
Promise.resolve() |
|
.then(() => { |
|
if (action === 'call') { |
|
const fn = args.shift() |
|
if (typeof this._api[fn] !== 'function') { |
|
throw new Error(`[child ${this._client.pid}].${fn} is not a function (modulePath: ${this._modulePath})`) |
|
} |
|
return this._api[fn].apply(this._api, args) |
|
} |
|
if (action === 'exit') { |
|
const code = args[0] || 0 |
|
process.once('exit', () => { |
|
this.debug('exit on master request') |
|
}) |
|
process.nextTick(() => { |
|
process.exit(code) |
|
}) |
|
return process.pid |
|
} |
|
if (action === 'get') { |
|
return this._api[args[0]] |
|
} |
|
if (action === 'set') { |
|
return (this._api[args[0]] = args[1]) |
|
} |
|
if (action === 'ping') { |
|
return process.pid |
|
} |
|
}) |
|
.then(req.resolve) |
|
.catch(req.reject) |
|
} |
|
|
|
_loadCallableApi () { |
|
const api = super._loadCallableApi() |
|
const ctor = api.constructor !== Object ? api.constructor : api |
|
|
|
const ipcKey = ctor[IPC_PROP_KEY] || 'ipc' |
|
const setIpcFn = ctor[IPC_SETTER_KEY] || 'setIpc' |
|
const initFn = ctor[IPC_INIT_KEY] || 'init' |
|
|
|
if (api[setIpcFn]) { |
|
api[setIpcFn].call(api, this) |
|
} |
|
|
|
if (!api[setIpcFn] && !api.hasOwnProperty(ipcKey)) { |
|
Object.defineProperty(api, ipcKey, {value: this}) |
|
} |
|
|
|
const init = Promise.resolve() |
|
.then(() => { |
|
this.debug('initialize') |
|
if (typeof api[initFn] === 'function') { |
|
this.debug(initFn, '(', this._argv.join(', '), ')') |
|
return api[initFn].apply(api, this._argv) |
|
} else { |
|
this.debug.warn('init function not found:', initFn) |
|
} |
|
}) |
|
|
|
polygoat(init, (error, result) => { |
|
if (error) { |
|
this._init.reject(error) |
|
} else { |
|
this._init.resolve(result) |
|
} |
|
}) |
|
|
|
return api |
|
} |
|
} |
|
|
|
class MasterIPCEE extends AbstractIPCEE { |
|
static create (modulePath, argv, opts) { |
|
if (!Array.isArray(argv)) { |
|
opts = argv |
|
argv = undefined |
|
} |
|
|
|
argv = argv || [] |
|
opts = opts || {} |
|
|
|
opts.debug = opts.debug != null ? opts.debug : !!process.env.DEBUG_CALLABLE_FORK |
|
opts.errio = Object.assign({stack: true}, opts.errio) |
|
|
|
return new this({modulePath, argv, opts}) |
|
} |
|
|
|
constructor (args) { |
|
super(args) |
|
this.start() |
|
} |
|
|
|
_bootstrap () { |
|
super._bootstrap() |
|
this.once('init', (error, result) => { |
|
this.debug.response('child initialization:', error, result) |
|
if (error) { |
|
this._init.reject(error) |
|
} else { |
|
this._init.resolve(result) |
|
} |
|
}) |
|
} |
|
|
|
call (...args) { |
|
return this._dialog.apply(this, ['call'].concat(args)) |
|
} |
|
|
|
exit (...args) { |
|
const callback = typeof args.slice(-1)[0] === 'function' && args.pop() |
|
const deferred = promiseDefer.call(this) |
|
const [code, timeout] = args |
|
|
|
if (!this._client.connected || this._client.exitCode != null) { |
|
deferred.resolve(this._client.exitCode) |
|
} else { |
|
this._dialog('exit', code, (error, result) => { |
|
this.debug.response('child exit', error, result) |
|
|
|
if (error) { |
|
this.debug.error(error) |
|
} |
|
|
|
let checkTask |
|
let timeoutTask |
|
const done = () => { |
|
clearTimeout(timeoutTask) |
|
clearInterval(checkTask) |
|
if (this._client.exitCode == null) { |
|
deferred.reject(error) |
|
} else { |
|
deferred.resolve(result) |
|
} |
|
} |
|
|
|
timeoutTask = setTimeout(() => { |
|
if (this._client.exitCode == null) { |
|
this._client.kill() |
|
} |
|
process.nextTick(() => done()) |
|
}, timeout || EXIT_TIMEOUT) |
|
|
|
checkTask = setInterval(() => { |
|
if (this._client.exitCode != null) { |
|
done() |
|
} |
|
}, EXIT_CHECK_INTERVAL) |
|
}) |
|
} |
|
|
|
return deferred.polygoat(callback) |
|
} |
|
|
|
get (...args) { |
|
return this._dialog.apply(this, ['get'].concat(args)) |
|
} |
|
|
|
kill (signal) { |
|
if (this._client && this._client.exitCode == null) { |
|
this._client.kill(signal) |
|
} |
|
return this |
|
} |
|
|
|
set (...args) { |
|
return this._dialog.apply(this, ['set'].concat(args)) |
|
} |
|
|
|
restart (...args) { |
|
const callback = typeof args.slice(-1)[0] === 'function' && args.pop() |
|
const deferred = promiseDefer.call(this) |
|
|
|
this.emit('restart') |
|
|
|
// this._init = promiseDefer.call(this) |
|
|
|
this.exit((error, result) => { |
|
this._init = promiseDefer.call(this) |
|
this.start.apply(this, args) |
|
.then(deferred.resolve) |
|
.catch(deferred.reject) |
|
}) |
|
|
|
return deferred.polygoat(callback) |
|
} |
|
|
|
start (...args) { |
|
const callback = typeof args.slice(-1)[0] === 'function' && args.pop() |
|
|
|
if (Array.isArray(args[0])) { |
|
this._argv = args[0] |
|
} |
|
|
|
this.debug('start') |
|
|
|
const childArgv = [JSON.stringify(this)] |
|
const child = Child.fork(__filename, childArgv, this._opts) |
|
|
|
this._setClient(child) |
|
|
|
return this._init.polygoat(callback) |
|
} |
|
|
|
toObject () { |
|
return Object.assign({}, super.toObject(), { |
|
master: process.pid |
|
}) |
|
} |
|
} |
|
|
|
function getDebugger (enabled, ctor) { |
|
const isChild = ctor.name.toLocaleLowerCase().indexOf('master') === -1 |
|
const prefix = `[${isChild ? 'child' : 'mastr'} ${process.pid}] ` |
|
const api = (...args) => api.log.apply(null, args) |
|
;[ |
|
['log', '\u001b[90m', '\u001b[39m'], |
|
['error', '\u001b[31m', '\u001b[39m'], |
|
['warn', '\u001b[33m', '\u001b[39m'], |
|
['info', '\u001b[36m', '\u001b[39m'] |
|
].forEach(([key, startColor, endColor]) => { |
|
if (!enabled) { |
|
api[key] = () => {} |
|
return |
|
} |
|
if (isChild) { |
|
startColor = startColor + '\u001b[2m' |
|
endColor = '\u001b[22m' + endColor |
|
} |
|
let lvlPrefix = '' |
|
if (key !== 'log') { |
|
lvlPrefix = `\u001b[1m${key.toUpperCase()}\u001b[22m${startColor}: ` |
|
} |
|
key !== 'log' ? key.toUpperCase() + ': ' : '' |
|
api[key] = function () { |
|
const msg = Util.format.apply(null, arguments) |
|
console.log(''.concat(startColor, prefix, lvlPrefix + msg, endColor)) |
|
} |
|
}) |
|
api.response = (label, error, result) => { |
|
api[error ? 'error' : 'info'] |
|
.call(null, label, '--', error ? 'FAIL' : 'ok') |
|
} |
|
return api |
|
} |
|
|
|
function isSendHandle (obj) { |
|
return !!SEND_HANDLES.find((sendHanle) => obj instanceof sendHanle) |
|
} |
|
|
|
function polygoat (promise, callback) { |
|
if (!callback) { |
|
return promise |
|
} |
|
promise.then((result) => callback(null, result)).catch(callback) |
|
return this |
|
} |
|
|
|
function promiseDefer () { |
|
const deferred = Promise.defer() |
|
deferred.polygoat = polygoat.bind(this, deferred.promise) |
|
return deferred |
|
} |
|
|
|
function tryParseError (obj) { |
|
try { |
|
if (typeof obj === 'string') { |
|
return Errio.parse(obj) |
|
} else { |
|
return obj |
|
} |
|
} catch (ignored) { |
|
return obj |
|
} |
|
} |
|
|
|
function tryStringifyError (obj) { |
|
try { |
|
if (obj instanceof Error) { |
|
return Errio.stringify(obj) |
|
} else { |
|
return obj |
|
} |
|
} catch (ignored) { |
|
return obj |
|
} |
|
} |
|
|
|
exports = module.exports = callableFork |
|
exports.AbstractIPCEE = AbstractIPCEE |
|
exports.ChildIPCEE = ChildIPCEE |
|
exports.MasterIPCEE = MasterIPCEE |
|
exports.getDebugger = getDebugger |
|
exports.isSendHandle = isSendHandle |
|
exports.polygoat = polygoat |
|
exports.tryParseError = tryParseError |
|
exports.tryStringifyError = tryStringifyError |
|
|
|
if (require.main === module) { |
|
ChildIPCEE.create() |
|
} |