Skip to content

Instantly share code, notes, and snippets.

@shuhei
Last active June 10, 2018 23:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shuhei/e4ab7506878a1904cff0c1b301e2c530 to your computer and use it in GitHub Desktop.
Save shuhei/e4ab7506878a1904cff0c1b301e2c530 to your computer and use it in GitHub Desktop.
Worker Pool Prototype
const url = require('url');
const https = require('https');
const agent = new https.Agent({
keepAlive: true
});
function getAPI(uri) {
return new Promise((resolve, reject) => {
const { protocol, hostname, path } = url.parse(uri);
const options = {
protocol,
hostname,
path,
agent,
};
const req = https.get(options, (res) => {
if (res.statusCode !== 200) {
reject(new Error(`Non-200 status code ${res.statusCode}`));
return;
}
res.setEncoding('utf8');
let buffer = '';
res.on('data', (chunk) => {
buffer += chunk;
});
res.on('end', () => {
try {
resolve(JSON.parse(buffer));
} catch (e) {
reject(e);
}
});
});
req.on('error', (err) => {
reject(err);
});
});
}
module.exports = function getAPIs() {
const promises = [
getAPI('https://jsonplaceholder.typicode.com/posts'),
getAPI('https://jsonplaceholder.typicode.com/comments'),
getAPI('https://jsonplaceholder.typicode.com/albums'),
getAPI('https://jsonplaceholder.typicode.com/photos'),
getAPI('https://jsonplaceholder.typicode.com/todos'),
getAPI('https://jsonplaceholder.typicode.com/users'),
];
return Promise.all(promises)
.then(([posts]) => posts);
};
const cluster = require('cluster');
const express = require('express');
const getPosts = require('./api');
const render = require('./render');
if (cluster.isMaster) {
for (let i = 0; i < 4; i++) {
cluster.fork();
}
} else {
const app = express();
app.get('/', (req, res) => {
getPosts()
.then(posts => render({
posts,
time: new Date().toISOString(),
}))
.then((html) => {
res.send(html);
});
});
app.listen(8080, () => {
console.log('listening on 8080');
});
}
const { fork } = require('child_process');
const express = require('express');
const getPosts = require('./api');
class WorkerPool {
constructor(workerScript) {
this.workerScript = workerScript;
this.workers = new Map();
this.listeners = new Map();
this.waitingList = [];
}
addWorkers(count) {
for (let i = 0; i < count; i++) {
const worker = this._createWorker();
this.workers.set(worker.pid, worker);
}
}
requestWork(args) {
const worker = this._findFreeWorker();
console.log('free worker', worker && worker.pid);
if (worker) {
return this._requestWork(worker, args);
} else {
return this._waitForWorker(args);
}
}
_waitForWorker(args) {
// TODO: Set a timeout.
return new Promise((resolve, reject) => {
this.waitingList.push((worker) => {
this._requestWork(worker, args)
.then(resolve, reject);
});
});
}
_requestWork(worker, args) {
return new Promise((resolve, reject) => {
worker.send(args);
// TODO: Set a timeout.
// TODO: Worker can fail.
this.listeners.set(worker.pid, resolve);
});
}
_createWorker() {
const worker = fork(this.workerScript);
worker.on('message', (message) => {
this._onWorkerMessage(worker.pid, message);
});
return worker;
}
_findFreeWorker() {
for (const pid of this.workers.keys()) {
if (!this.listeners.has(pid)) {
return this.workers.get(pid);
}
}
return null;
}
_onWorkerMessage(pid, message) {
console.log('parent got message', pid, message.length);
const listener = this.listeners.get(pid);
if (listener) {
console.log('listener found');
this.listeners.delete(pid);
listener(message);
} else {
console.log('listener not found');
// Timeout?
}
if (this.waitingList.length > 0) {
const waiting = this.waitingList.shift();
const worker = this.workers.get(pid);
waiting(worker);
}
}
}
const pool = new WorkerPool('worker.js');
pool.addWorkers(4);
const app = express();
app.get('/', (req, res) => {
getPosts()
.then(posts => pool.requestWork({
posts,
time: new Date().toISOString(),
}))
.then((html) => {
res.send(html);
});
});
app.listen(8080, () => {
console.log('listening on 8080');
});
module.exports = function render({ posts, time }) {
const start = Date.now();
// Pretend to be CPU-intensive like React SSR...
while (Date.now() - start < 30) {}
const postList = posts.map(post =>
`<div><h2>${post.title}</h2><p>${post.body}</p></div>`
).join('');
return `<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hello</title>
<style>
body {
font-family: sans-serif;
}
</style>
</head>
<body>
<h1>Sample Page</h1>
<p>${time}</p>
${postList}
</body>
</html>`;
}
const express = require('express');
const getPosts = require('./api');
const render = require('./render');
const app = express();
app.get('/', (req, res) => {
getPosts()
.then(posts => render({
posts,
time: new Date().toISOString(),
}))
.then((html) => {
res.send(html);
});
});
app.listen(8080, () => {
console.log('listening on 8080');
});
const render = require('./render');
process.on('message', (m) => {
console.log('child got message:', m.time);
process.send(render(m));
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment