Skip to content

Instantly share code, notes, and snippets.

@enten
Last active February 22, 2017 18:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save enten/05f532d62aed5139aed8105cb27aa2c9 to your computer and use it in GitHub Desktop.
Save enten/05f532d62aed5139aed8105cb27aa2c9 to your computer and use it in GitHub Desktop.
Make a fork process callable easily

callableFork

Requirements

npm install caller errio eventemitter3

Example

// fakeMoodule.js
const callableFork = require('./callableFork')

module.exports = callableFork({
  init (...args) {
    console.log(args)
    return 'value returned when master called ipc.ready()'
  },
  foo: 'FOO',
  bar (...args) {
    console.log(args)
    return 'value returned when master called ipc.call("bar")'
  }
})

// or

// callableFork(module, {
//   init (...args) {
//     console.log(args)
//     return 'value returned when master called ipc.ready()'
//   },
//   foo: 'FOO',
//   bar (...args) {
//     console.log(args)
//     return 'value returned when master called ipc.call("bar")'
//   }
// })
// test.js
const fakeModule = require('./fakeMoodule')

const ipc = fakeModule.fork(['args', 'passed', 'to', 'fakeModule.init()'])

// ipc is an instance of EventEmitter3: https://www.npmjs.com/package/eventemitter3
// with additional methods below to control the child process

ipc.ready()
  .then(console.log)
  .then(() => ipc.on('exit', () => console.log('bye!')))
  .then(() => ipc.ping())
  .then(console.log)
  .then(() => ipc.get('foo'))
  .then(console.log)
  .then(() => ipc.set('foo', 'fox'))
  .then(console.log)
  .then(() => ipc.call('bar', 'args', 'passed', 'to', 'fakeModule.bar()'))
  .then(console.log)
  .then(() => ipc.restart())
  .then(console.log)
  .then(() => ipc.exit(1))
  .catch(console.error)
const Child = require('child_process')
const EventEmitter = require('eventemitter3')
const Errio = require('errio')
const Util = require('util')
const EXIT_TIMEOUT = 10000
const EXIT_CHECK_INTERVAL = 200
const IPC_INIT_KEY = '@init'
const IPC_SETTER_KEY = '@setIpc'
const IPC_PROP_KEY = '@ipc'
const SEND_HANDLES = [
require('net').Socket,
require('net').Server,
process.binding('pipe_wrap').Pipe,
process.binding('tcp_wrap').TCP,
process.binding('udp_wrap').UDP,
require('dgram').Socket
]
function callableFork (api, ...args) {
const fork = (path, argv, opts) => {
return MasterIPCEE.create(path, argv, opts)
}
let modulePath
if (typeof api === 'string') {
const {resolve: resolvePath, dirname} = require('path')
modulePath = resolvePath(dirname(require('caller')()), api)
return fork.apply(null, [require.resolve(modulePath)].concat(args))
}
if (api instanceof module.constructor) {
modulePath = api.filename
api = api.exports = args[0] || api.exports
}
if (!modulePath) {
modulePath = require('caller')()
}
api.fork = fork.bind(null, modulePath)
return api
}
class AbstractIPCEE extends EventEmitter {
constructor (args) {
super()
Object.keys(args)
.forEach((key) => (this['_' + key] = args[key]))
this._debugger = getDebugger(this._opts.debug, this.constructor)
this._init = promiseDefer.call(this)
this._lastReq = -1
this._requests = {}
this._loadCallableApi()
}
get debug () {
return this._debugger
}
_bootstrap () {
this.debug('bootstrap')
const grabMessages = (args) => {
args = Array.isArray(args) ? args : [].slice.call(arguments)
args = args.map(tryParseError)
if (args[0].startsWith(this._client.pid)) {
this.debug('msg', args)
args[0] = args[0].substring(args[0].indexOf(':') + 1)
this.emit.apply(this, args)
} else {
this.debug('msg IGNORED', args)
}
}
const stopGrabbing = () => {
this.emit('disconnect')
Object.keys(this._requests).forEach((reqId) => {
const req = this._requests[reqId]
if (typeof req === 'object') {
// TODO
// this.debug('EXIT REQUEST', reqId)
}
})
}
this._client.on('message', grabMessages)
this._client.once('disconnect', stopGrabbing)
this._client.once('exit', (code) => this.emit('exit', code))
this.once('initialized', () => this.debug.info('initialized'))
this._init.polygoat((error) => !error && this.emit('initialized'))
this._init.polygoat((error, result) => {
if (!error) {
this.on('req', (reqId, ...args) => {
const req = promiseDefer.call(this)
req.id = reqId
this._handleRequest.apply(this, [req].concat(args))
req.polygoat((error, result) => {
this.send('res-' + reqId, error, result)
})
})
}
})
}
_dialog (fn, ...args) {
const reqId = `${process.pid}-${++this._lastReq}`
const req = this._requests[reqId] = promiseDefer.call(this)
const callback = typeof args.slice(-1)[0] === 'function' && args.pop()
req.id = reqId
this.once('res-' + reqId, (error, result) => {
this.debug.response('req-' + reqId, error, result)
this._requests[reqId] = !error
if (error) {
req.reject(error)
} else {
req.resolve(result)
}
})
this.send.apply(this, ['req', reqId, fn].concat(args))
return req.polygoat(callback)
}
_handleRequest (req, action, ...args) {
if (action === 'ping') {
return req.resolve(process.pid)
}
req.reject(new Error('no handler defined'))
}
_loadCallableApi () {
// delete require.cache[this._modulePath]
const ModuleExports = require(this._modulePath)
let api
if (typeof ModuleExports === 'function') {
api = new ModuleExports()
} else {
api = Object.create(ModuleExports)
}
return (this._api = api)
}
_setClient (client) {
this._client = client
this._bootstrap()
}
_send (...args) {
args[0] = `${this._client.pid}:${args[0]}`
const callback = typeof args.slice(-1)[0] === 'function' && args.pop()
let sendArgs
if (SEND_HANDLES.find((sendHanle) => args[1] instanceof sendHanle)) {
sendArgs = [args[0], args[1]]
} else {
sendArgs = [args.map(tryStringifyError)]
}
if (callback) {
sendArgs.push(callback)
}
this.debug('send', args)
this._client.send.apply(this._client, sendArgs)
return this
}
ping (callback) {
const startTime = (new Date()).getTime()
const pong = Promise.resolve(this._client.connected)
.then((connected) => {
if (!connected) {
return -1
}
return this._dialog('ping')
})
.then((pid) => {
const endTime = (new Date()).getTime()
const time = endTime - startTime
const alive = pid > -1
return {pid, alive, startTime, endTime, time}
})
return polygoat.call(this, pong, callback)
}
ready (callback) {
return this._init.polygoat(callback)
}
send (...args) {
return Promise.resolve(this._init.promise)
.then(() => this._send.apply(this, args))
}
toJSON () {
return this.toObject()
}
toObject () {
return {
modulePath: this._modulePath,
argv: this._argv,
opts: this._opts
}
}
when (event, listener) {
if (listener) {
this.once(event, listener)
return this
}
return new Promise((resolve) => {
this.once(event, (...args) => resolve(args))
})
}
}
class ChildIPCEE extends AbstractIPCEE {
static create () {
return new this(process)
}
constructor (client) {
super(JSON.parse(client.argv.pop()))
this.debug(this.constructor.name, this.toObject())
Errio.setDefaults(this._opts.errio)
this._argv.forEach((value) => process.argv.push(value))
this._setClient(client)
}
_bootstrap () {
super._bootstrap()
this._init.polygoat((error, result) => {
this.debug.response('initialization', error, result)
this._send('init', error, result)
})
}
_handleRequest (req, action, ...args) {
Promise.resolve()
.then(() => {
if (action === 'call') {
const fn = args.shift()
if (typeof this._api[fn] !== 'function') {
throw new Error(`[child ${this._client.pid}].${fn} is not a function (modulePath: ${this._modulePath})`)
}
return this._api[fn].apply(this._api, args)
}
if (action === 'exit') {
const code = args[0] || 0
process.once('exit', () => {
this.debug('exit on master request')
})
process.nextTick(() => {
process.exit(code)
})
return process.pid
}
if (action === 'get') {
return this._api[args[0]]
}
if (action === 'set') {
return (this._api[args[0]] = args[1])
}
if (action === 'ping') {
return process.pid
}
})
.then(req.resolve)
.catch(req.reject)
}
_loadCallableApi () {
const api = super._loadCallableApi()
const ctor = api.constructor !== Object ? api.constructor : api
const ipcKey = ctor[IPC_PROP_KEY] || 'ipc'
const setIpcFn = ctor[IPC_SETTER_KEY] || 'setIpc'
const initFn = ctor[IPC_INIT_KEY] || 'init'
if (api[setIpcFn]) {
api[setIpcFn].call(api, this)
}
if (!api[setIpcFn] && !api.hasOwnProperty(ipcKey)) {
Object.defineProperty(api, ipcKey, {value: this})
}
const init = Promise.resolve()
.then(() => {
this.debug('initialize')
if (typeof api[initFn] === 'function') {
this.debug(initFn, '(', this._argv.join(', '), ')')
return api[initFn].apply(api, this._argv)
} else {
this.debug.warn('init function not found:', initFn)
}
})
polygoat(init, (error, result) => {
if (error) {
this._init.reject(error)
} else {
this._init.resolve(result)
}
})
return api
}
}
class MasterIPCEE extends AbstractIPCEE {
static create (modulePath, argv, opts) {
if (!Array.isArray(argv)) {
opts = argv
argv = undefined
}
argv = argv || []
opts = opts || {}
opts.debug = opts.debug != null ? opts.debug : !!process.env.DEBUG_CALLABLE_FORK
opts.errio = Object.assign({stack: true}, opts.errio)
return new this({modulePath, argv, opts})
}
constructor (args) {
super(args)
this.start()
}
_bootstrap () {
super._bootstrap()
this.once('init', (error, result) => {
this.debug.response('child initialization:', error, result)
if (error) {
this._init.reject(error)
} else {
this._init.resolve(result)
}
})
}
call (...args) {
return this._dialog.apply(this, ['call'].concat(args))
}
exit (...args) {
const callback = typeof args.slice(-1)[0] === 'function' && args.pop()
const deferred = promiseDefer.call(this)
const [code, timeout] = args
if (!this._client.connected || this._client.exitCode != null) {
deferred.resolve(this._client.exitCode)
} else {
this._dialog('exit', code, (error, result) => {
this.debug.response('child exit', error, result)
if (error) {
this.debug.error(error)
}
let checkTask
let timeoutTask
const done = () => {
clearTimeout(timeoutTask)
clearInterval(checkTask)
if (this._client.exitCode == null) {
deferred.reject(error)
} else {
deferred.resolve(result)
}
}
timeoutTask = setTimeout(() => {
if (this._client.exitCode == null) {
this._client.kill()
}
process.nextTick(() => done())
}, timeout || EXIT_TIMEOUT)
checkTask = setInterval(() => {
if (this._client.exitCode != null) {
done()
}
}, EXIT_CHECK_INTERVAL)
})
}
return deferred.polygoat(callback)
}
get (...args) {
return this._dialog.apply(this, ['get'].concat(args))
}
kill (signal) {
if (this._client && this._client.exitCode == null) {
this._client.kill(signal)
}
return this
}
set (...args) {
return this._dialog.apply(this, ['set'].concat(args))
}
restart (...args) {
const callback = typeof args.slice(-1)[0] === 'function' && args.pop()
const deferred = promiseDefer.call(this)
this.emit('restart')
// this._init = promiseDefer.call(this)
this.exit((error, result) => {
this._init = promiseDefer.call(this)
this.start.apply(this, args)
.then(deferred.resolve)
.catch(deferred.reject)
})
return deferred.polygoat(callback)
}
start (...args) {
const callback = typeof args.slice(-1)[0] === 'function' && args.pop()
if (Array.isArray(args[0])) {
this._argv = args[0]
}
this.debug('start')
const childArgv = [JSON.stringify(this)]
const child = Child.fork(__filename, childArgv, this._opts)
this._setClient(child)
return this._init.polygoat(callback)
}
toObject () {
return Object.assign({}, super.toObject(), {
master: process.pid
})
}
}
function getDebugger (enabled, ctor) {
const isChild = ctor.name.toLocaleLowerCase().indexOf('master') === -1
const prefix = `[${isChild ? 'child' : 'mastr'} ${process.pid}] `
const api = (...args) => api.log.apply(null, args)
;[
['log', '\u001b[90m', '\u001b[39m'],
['error', '\u001b[31m', '\u001b[39m'],
['warn', '\u001b[33m', '\u001b[39m'],
['info', '\u001b[36m', '\u001b[39m']
].forEach(([key, startColor, endColor]) => {
if (!enabled) {
api[key] = () => {}
return
}
if (isChild) {
startColor = startColor + '\u001b[2m'
endColor = '\u001b[22m' + endColor
}
let lvlPrefix = ''
if (key !== 'log') {
lvlPrefix = `\u001b[1m${key.toUpperCase()}\u001b[22m${startColor}: `
}
key !== 'log' ? key.toUpperCase() + ': ' : ''
api[key] = function () {
const msg = Util.format.apply(null, arguments)
console.log(''.concat(startColor, prefix, lvlPrefix + msg, endColor))
}
})
api.response = (label, error, result) => {
api[error ? 'error' : 'info']
.call(null, label, '--', error ? 'FAIL' : 'ok')
}
return api
}
function isSendHandle (obj) {
return !!SEND_HANDLES.find((sendHanle) => obj instanceof sendHanle)
}
function polygoat (promise, callback) {
if (!callback) {
return promise
}
promise.then((result) => callback(null, result)).catch(callback)
return this
}
function promiseDefer () {
const deferred = Promise.defer()
deferred.polygoat = polygoat.bind(this, deferred.promise)
return deferred
}
function tryParseError (obj) {
try {
if (typeof obj === 'string') {
return Errio.parse(obj)
} else {
return obj
}
} catch (ignored) {
return obj
}
}
function tryStringifyError (obj) {
try {
if (obj instanceof Error) {
return Errio.stringify(obj)
} else {
return obj
}
} catch (ignored) {
return obj
}
}
exports = module.exports = callableFork
exports.AbstractIPCEE = AbstractIPCEE
exports.ChildIPCEE = ChildIPCEE
exports.MasterIPCEE = MasterIPCEE
exports.getDebugger = getDebugger
exports.isSendHandle = isSendHandle
exports.polygoat = polygoat
exports.tryParseError = tryParseError
exports.tryStringifyError = tryStringifyError
if (require.main === module) {
ChildIPCEE.create()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment