Skip to content

Instantly share code, notes, and snippets.

@vanayun
Last active May 24, 2019 16:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vanayun/6124d9c66d582df14f941188de0e3c8b to your computer and use it in GitHub Desktop.
Save vanayun/6124d9c66d582df14f941188de0e3c8b to your computer and use it in GitHub Desktop.
Coding with Streams
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
zlib.gzip(buffer, (err, buffer) => {
fs.writeFile(file + '.gz', buffer, err => {
console.log('File successfully compressed');
});
});
});
const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');
fs.createReadStream(process.argv[2]) //입력으로 주어진 파일로부터 Readable 스트림 생성
.pipe(split()) //각각의 라인을 서로 다른 데이터 덩어리로 출력하는 Transform스트림인 split을 통해 입력 파일의 내용을 연결(pipe)한다.
.pipe(new ParallelStream((url, enc, push, done) => {
//ParallelStream을 사용하여 요청 헤더를 보내고 응답을 기다려 URL을 검사한다.
// 콜백이 호출될 때 작업 결과를 스트림으로 밀어낸다.
if(!url) return done();
request.head(url, (err, response) => {
push(url + ' is ' + (err? 'down' : 'up') + '\n');
done();
});
}))
.pipe(fs.createWriteStream('results.txt')) //모든 결과가 results.txt파일에 파이프 된다.
.on('finish', () => console.log('All urls were checked'));
const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');
function concatFiles(destination, files, callback) {
const destStream = fs.createWriteStream(destination);
fromArray.obj(files) // from2-array를 사용하여 파일 배열에서 Readable 스트림을 만든다.
.pipe(through.obj((file, enc, done) => {
// 순차적으로 파일을 처리하기 위해 through(Transform)스트림을 생성.
// 각 파일에 대해 Readable 스트림을 만들고, 이를 출력 파일을 나타내는 destStream으로 연결(pipe)
// pipe 옵션으로 {end:false}를 정의함으로써 소스 파일의 읽기를 완료한 후에도 destStream을 닫지 않도록 한다.
const src = fs.createReadStream(file);
src.pipe(destStream, {end: false});
src.on('end', done) // 모든 내용이 destStream으로 전달 되면 through에 done함수를 호출하여 현재 처리가 완료됨을 알림.
}))
.on('finish', () => {
// 모든 파일이 처리되면 finish 이벤트 시작. destStream을 종료하고 concatFiles()의 callback()함수 호출하여 완료.
destStream.end();
callback();
});
}
module.exports = concatFiles;
const stream = require('stream');
class LimitedParallelStream extends stream.Transform {
constructor(concurrency, userTransform) { // 동시 실행 제한을 입력받음.
super({objectMode: true});
this.concurrency = concurrency;
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null; // _flush 메소드
this.continueCallback = null; // _transform 메소드
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this._onComplete.bind(this));
if(this.running < this.concurrency) { // done을 호출하기 전에 예비 슬롯이 남아있는지 확인.
done();
} else { // 최대 동시 실행 스트림 수에 도달한 경우
this.continueCallback = done;
}
}
_flush(done) {
if(this.running> 0) {
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
const tmpCallback = this.continueCallback;
this.continueCallback = null;
tmpCallback && tmpCallback(); // 작업이 완료될 때마다 스트림의 차단을 해제할 continueCallback호출.
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = LimitedParallelStream;
const stream = require('stream');
class ParallelStream extends stream.Transform {
constructor(userTransform) {
super({objectMode: true});
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this),
this._onComplete.bind(this));
done(); // userTransform이 완료되는걸 기다리지 않고 바로 호출함으로써 다른 항목의 처리를 시작할 수 있다.
}
_flush(done) { // 스트림이 끝나기 직전에 호출된다.
if(this.running> 0) { // 실행중인 작업이 있을 경우 done콜백을 호출하지 않도록 하여 finish 이벤트의 발생을 보류.
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) { // 비동기 작업이 완료될 때마다 호출.
this.running--;
if(err) {
return this.emit('error', err);
}
if(this.running === 0) { // 싫행 중인 작업이 없으면 스트림을 종료시키고 _flush에서 보류된 이벤트 발생.
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = ParallelStream;
const stream = require('stream');
const Chance = require('chance');
const chance = new Chance();
class RandomStream extends stream.Readable {
constructor(options) {
super(options);
}
_read(size) {
const chunk = chance.string();
this.push(chunk, 'utf8');
if(chance.bool({likelihood: 5})) {
this.push(null);
}
}
}
module.exports = RandomStream;
const stream = require('stream');
class ReplaceStream extends stream.Transform {
constructor(searchString, replaceString) {
super();
this.searchString = searchString;
this.replaceString = replaceString;
this.tailPiece = '';
}
_transform(chunk, encoding, callback) {
const pieces = (this.tailPiece + chunk).split(this.searchString);
const lastPiece = pieces[pieces.length -1];
const tailPieceLen = this.searchString.length -1;
this.tailPiece = lastPiece.slice(-tailPieceLen);
pieces[pieces.length -1] = lastPiece.slice(0, -tailPieceLen);
this.push(pieces.join(this.replaceString));
callback();
}
_flush(callback) {
this.push(this.tailPiece);
callback();
}
}
module.exports = ReplaceStream;
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => { console.log('File successfully compressed'));
const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');
fs.createReadStream(process.argv[2])
.pipe(split())
.pipe(throughParallel.obj({concurrency:2}, (url, enc, done) => { // 동시 실행 제한을 지정할 수 있음.
if(!url) return done();
request.head(url, (err, response) => {
push(url + ' is ' + (err? 'down' : 'up') + '\n');
done();
});
}))
.pipe(fs.createWriteStream('results.txt'))
.on('finish', () => console.log('All urls were checked'));
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');
class ToFileStream extends stream.Writable {
constructor() {
super({objectMode: true});
}
_write(chunk, encoding, callback) {
mkdirp(path.dirname(chunk.path), err => {
if(err) {
return callback(err);
}
fs.writeFile(chunk.path, chunk.content, callback);
});
}
}
module.exports = ToFileStream;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment