Skip to content

Instantly share code, notes, and snippets.

@asfourco
Last active December 11, 2017 19:38
Show Gist options
  • Save asfourco/358e3247733f6b841916fd62378c1736 to your computer and use it in GitHub Desktop.
Save asfourco/358e3247733f6b841916fd62378c1736 to your computer and use it in GitHub Desktop.
nodeJS child_process.fork() proof of concept
const path = require('path')
const {fork} = require('child_process')
// command line argument
const source = path.resolve(process.env.source)
const tikaExtractFork = fork('tikaExtract.js')
tikaExtractFork.once('message', (data) => {
if (data.error) console.error(`something went wrong => ${JSON.stringify(data)}`)
else console.log(`data received => words:${data.payload.words}, content-type:${data.payload.meta['Content-Type']}`)
endChildProcess(tikaExtractFork)
})
const tikaTypeFork = fork('tikaType.js')
tikaTypeFork.once('message', (data) => {
if (data.error) console.error(`something went wrong => ${JSON.stringify(data)}`)
else console.log(`data received => content-type:${data.payload}`)
endChildProcess(tikaTypeFork)
})
// utility functions
const endChildProcess = (process) => {
console.log(`closing process: ${process.pid}`)
process.kill()
}
// main entry
console.log(`started index.js with pid: ${process.pid}`)
tikaExtractFork.send({file:source})
tikaTypeFork.send({file:source})
{
"name": "spawn",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"bluebird": "^3.5.1",
"java": "^0.9.0",
"tika": "^1.6.1"
}
}
const tika = require('tika')
const fs = require('fs')
const GB = 1000 * 1000 * 1000
const memLimit = 3*GB // bytes
const usageLimit = 60 // seconds
const extractContent = (file) => {
const start = process.cpuUsage()
console.log(`Tika extraction started ...pid: ${process.pid}`)
setInterval(() => {
const mem = process.memoryUsage()
const run = process.cpuUsage(start) // get the current runtime
const userUsage = run.user / 1000000 // convert to seconds
// kill child process because it exceedeed memory or it's taking too long
if (mem.rss >= memLimit || userUsage >= usageLimit) {
const errorMsg =`Exceeded memory limit or took too long (mem: ${mem.rss / GB} GB, usage: ${userUsage} seconds) ... killing process`
process.send({error:true, payload: errorMsg})
}
}, 1000)
tika.extract(file, {}, (err, text, meta) => {
if (err) process.send({error:true, payload: err})
const words = text.trim().split(/\s+/).length
process.send({error:false, payload: {words, meta}})
})
}
const checkFileExists = (file) => {
if (fs.existsSync(file)) extractContent(file)
else process.send({error:true, payload: `${file} not found`})
}
process.on('message', (data) => {
const {file} = data
console.log(`Process ${process.pid} recevied file ${file} to process`)
checkFileExists(file)
})
const tika = require('tika')
const fs = require('fs')
const typeIs = require('type-is')
const mime = require('mime-types')
const extractContentType = (file) => {
console.log(`Tika content-type extraction started ...pid: ${process.pid}`)
tika.type(file, (err, _contentType) => {
if (err) {
process.send({error:true, payload: err})
} else if (
typeIs.is(_contentType, ['video/z-sgi-movie']) &&
typeIs.is(mime.lookup(_tmpFileName), ['text/plain'])
) {
_contentType = 'text/plain'
}
process.send({error:false, payload: _contentType})
})
}
const checkFileExists = (file) => {
if (fs.existsSync(file)) extractContentType(file)
else process.send({error:true, payload: `${file} not found`})
}
process.on('message', (data) => {
const {file} = data
console.log(`Process ${process.pid} recevied file ${file} to process`)
checkFileExists(file)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment