Skip to content

Instantly share code, notes, and snippets.

@vanayun
Created May 4, 2019 16:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vanayun/e94456a75f7429489cc0f8265bbb7e0b to your computer and use it in GitHub Desktop.
Save vanayun/e94456a75f7429489cc0f8265bbb7e0b to your computer and use it in GitHub Desktop.
generator
const co = require('co');
class TaskQueue {
constructor(concurrency) {
this.concurrency = concurrency;
this.running = 0;
this.taskQueue = [];
this.consumerQueue = [];
this.spawnWorkers(concurrency); // worker 시작
}
pushTask(task) {
if (this.consumerQueue.length !== 0) {
this.consumerQueue.shift()(null, task);
// 대기중인 첫 번째 콜백을 호출함으로써 자례대로 worker의 차단을 해제한다.
} else {
this.taskQueue.push(task); // 모든 worker가 작업을 실행 중.
}
}
spawnWorkers(concurrency) {
const self = this;
for(let i = 0; i < concurrency; i++) {
// 즉시 실행되어 병렬 처리됨.
co(function* () {
while(true) {
// 무한 loop에서 블록(yield)되어 큐에서 새로운 작업을 기다린다.
const task = yield self.nextTask();
yield task;
}
});
}
}
nextTask() { // co 라이브러리를 통해 yieldable thunk를 반환한다.
return callback => {
if(this.taskQueue.length !== 0) {
return callback(null, this.taskQueue.shift());
// 즉시 worker의 yield가 해제되어 작업을 수행할 수 있다.
}
this.consumerQueue.push(callback); // 큐에 작업이 없는 경우.
}
}
}
module.exports = TaskQueue;
function* spiderLinks(currentUrl, body, nesting) {
if(nesting === 0) {
return nextTick();
}
//배열에서 일시 정지(yield)할 수 있는 co의 특징을 이용한 방법
const links = utilities.getPageLinks(currentUrl, body);
const tasks = links.map(link => spider(link, nesting - 1)); // 호출을 병렬로 변환한 부분
yield tasks;
// 위의 코드를 callback 사용하여 구현한 코드. returns a thunk
return callback => {
let completed = 0, hasErrors = false;
const links = utilities.getPageLinks(currentUrl, body);
if (links.length === 0) {
return process.nextTick(callback);
}
function done(err, result) {
if(err && !hasErrors) {
hasErrors = true;
return callback(err);
}
if(++completed === links.length && !hasErrors) {
callback();
}
}
for(let i = 0; i < links.length; i++) {
co(spider(links[i], nesting - 1)).then(done);
// spider를 병렬로 실행. resolve되면 done함수 호출.
}
}
}
const path = require('path');
const utilities = require('./utilities');
const thunkify = require('thunkify');
const co = require('co');
const request = thunkify(require('request')); // 코드를 thunkified
const fs = require('fs');
const mkdirp = thunkify(require('mkdirp'));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);
function* spiderLinks(currentUrl, body, nesting) {
if(nesting === 0) {
return nextTick();
}
const links = utilities.getPageLinks(currentUrl, body);
for(let i = 0; i < links.length; i++) {
yield spider(links[i], nesting - 1);
}
}
function* download(url, filename) {
console.log('Downloading ' + url);
const response = yield request(url);
const body = response[1];
yield mkdirp(path.dirname(filename));
yield writeFile(filename, body);
console.log(`Downloaded and saved: ${url}`);
return body;
}
function* spider(url, nesting) {
const filename = utilities.urlToFilename(url);
let body;
try {
body = yield readFile(filename, 'utf8');
} catch(err) {
if(err.code !== 'ENOENT') {
throw err;
}
body = yield download(url, filename);
//co가 yield를 지정가능 하고 다른 제너레이터를 지원하기에 사용 가능.
}
yield spiderLinks(url, body, nesting);
}
// entry point
// co는 yield문에 전달하는 모든 제너레이터(함수 or 객체)를 감싼다.
co(function* () {
try {
yield spider(process.argv[2], 1);
console.log('Download complete');
} catch(err) {
console.log(err);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment