-
-
Save vanayun/6124d9c66d582df14f941188de0e3c8b to your computer and use it in GitHub Desktop.
Coding with Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
const zlib = require('zlib'); | |
const file = process.argv[2]; | |
fs.readFile(file, (err, buffer) => { | |
zlib.gzip(buffer, (err, buffer) => { | |
fs.writeFile(file + '.gz', buffer, err => { | |
console.log('File successfully compressed'); | |
}); | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
const split = require('split'); | |
const request = require('request'); | |
const ParallelStream = require('./parallelStream'); | |
fs.createReadStream(process.argv[2]) //입력으로 주어진 파일로부터 Readable 스트림 생성 | |
.pipe(split()) //각각의 라인을 서로 다른 데이터 덩어리로 출력하는 Transform스트림인 split을 통해 입력 파일의 내용을 연결(pipe)한다. | |
.pipe(new ParallelStream((url, enc, push, done) => { | |
//ParallelStream을 사용하여 요청 헤더를 보내고 응답을 기다려 URL을 검사한다. | |
// 콜백이 호출될 때 작업 결과를 스트림으로 밀어낸다. | |
if(!url) return done(); | |
request.head(url, (err, response) => { | |
push(url + ' is ' + (err? 'down' : 'up') + '\n'); | |
done(); | |
}); | |
})) | |
.pipe(fs.createWriteStream('results.txt')) //모든 결과가 results.txt파일에 파이프 된다. | |
.on('finish', () => console.log('All urls were checked')); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fromArray = require('from2-array'); | |
const through = require('through2'); | |
const fs = require('fs'); | |
function concatFiles(destination, files, callback) { | |
const destStream = fs.createWriteStream(destination); | |
fromArray.obj(files) // from2-array를 사용하여 파일 배열에서 Readable 스트림을 만든다. | |
.pipe(through.obj((file, enc, done) => { | |
// 순차적으로 파일을 처리하기 위해 through(Transform)스트림을 생성. | |
// 각 파일에 대해 Readable 스트림을 만들고, 이를 출력 파일을 나타내는 destStream으로 연결(pipe) | |
// pipe 옵션으로 {end:false}를 정의함으로써 소스 파일의 읽기를 완료한 후에도 destStream을 닫지 않도록 한다. | |
const src = fs.createReadStream(file); | |
src.pipe(destStream, {end: false}); | |
src.on('end', done) // 모든 내용이 destStream으로 전달 되면 through에 done함수를 호출하여 현재 처리가 완료됨을 알림. | |
})) | |
.on('finish', () => { | |
// 모든 파일이 처리되면 finish 이벤트 시작. destStream을 종료하고 concatFiles()의 callback()함수 호출하여 완료. | |
destStream.end(); | |
callback(); | |
}); | |
} | |
module.exports = concatFiles; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
class LimitedParallelStream extends stream.Transform { | |
constructor(concurrency, userTransform) { // 동시 실행 제한을 입력받음. | |
super({objectMode: true}); | |
this.concurrency = concurrency; | |
this.userTransform = userTransform; | |
this.running = 0; | |
this.terminateCallback = null; // _flush 메소드 | |
this.continueCallback = null; // _transform 메소드 | |
} | |
_transform(chunk, enc, done) { | |
this.running++; | |
this.userTransform(chunk, enc, this._onComplete.bind(this)); | |
if(this.running < this.concurrency) { // done을 호출하기 전에 예비 슬롯이 남아있는지 확인. | |
done(); | |
} else { // 최대 동시 실행 스트림 수에 도달한 경우 | |
this.continueCallback = done; | |
} | |
} | |
_flush(done) { | |
if(this.running> 0) { | |
this.terminateCallback = done; | |
} else { | |
done(); | |
} | |
} | |
_onComplete(err) { | |
this.running--; | |
if(err) { | |
return this.emit('error', err); | |
} | |
const tmpCallback = this.continueCallback; | |
this.continueCallback = null; | |
tmpCallback && tmpCallback(); // 작업이 완료될 때마다 스트림의 차단을 해제할 continueCallback호출. | |
if(this.running === 0) { | |
this.terminateCallback && this.terminateCallback(); | |
} | |
} | |
} | |
module.exports = LimitedParallelStream; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
class ParallelStream extends stream.Transform { | |
constructor(userTransform) { | |
super({objectMode: true}); | |
this.userTransform = userTransform; | |
this.running = 0; | |
this.terminateCallback = null; | |
} | |
_transform(chunk, enc, done) { | |
this.running++; | |
this.userTransform(chunk, enc, this.push.bind(this), | |
this._onComplete.bind(this)); | |
done(); // userTransform이 완료되는걸 기다리지 않고 바로 호출함으로써 다른 항목의 처리를 시작할 수 있다. | |
} | |
_flush(done) { // 스트림이 끝나기 직전에 호출된다. | |
if(this.running> 0) { // 실행중인 작업이 있을 경우 done콜백을 호출하지 않도록 하여 finish 이벤트의 발생을 보류. | |
this.terminateCallback = done; | |
} else { | |
done(); | |
} | |
} | |
_onComplete(err) { // 비동기 작업이 완료될 때마다 호출. | |
this.running--; | |
if(err) { | |
return this.emit('error', err); | |
} | |
if(this.running === 0) { // 싫행 중인 작업이 없으면 스트림을 종료시키고 _flush에서 보류된 이벤트 발생. | |
this.terminateCallback && this.terminateCallback(); | |
} | |
} | |
} | |
module.exports = ParallelStream; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
const Chance = require('chance'); | |
const chance = new Chance(); | |
class RandomStream extends stream.Readable { | |
constructor(options) { | |
super(options); | |
} | |
_read(size) { | |
const chunk = chance.string(); | |
this.push(chunk, 'utf8'); | |
if(chance.bool({likelihood: 5})) { | |
this.push(null); | |
} | |
} | |
} | |
module.exports = RandomStream; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
class ReplaceStream extends stream.Transform { | |
constructor(searchString, replaceString) { | |
super(); | |
this.searchString = searchString; | |
this.replaceString = replaceString; | |
this.tailPiece = ''; | |
} | |
_transform(chunk, encoding, callback) { | |
const pieces = (this.tailPiece + chunk).split(this.searchString); | |
const lastPiece = pieces[pieces.length -1]; | |
const tailPieceLen = this.searchString.length -1; | |
this.tailPiece = lastPiece.slice(-tailPieceLen); | |
pieces[pieces.length -1] = lastPiece.slice(0, -tailPieceLen); | |
this.push(pieces.join(this.replaceString)); | |
callback(); | |
} | |
_flush(callback) { | |
this.push(this.tailPiece); | |
callback(); | |
} | |
} | |
module.exports = ReplaceStream; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
const zlib = require('zlib'); | |
const file = process.argv[2]; | |
fs.createReadStream(file) | |
.pipe(zlib.createGzip()) | |
.pipe(fs.createWriteStream(file + '.gz')) | |
.on('finish', () => { console.log('File successfully compressed')); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
const split = require('split'); | |
const request = require('request'); | |
const throughParallel = require('through2-parallel'); | |
fs.createReadStream(process.argv[2]) | |
.pipe(split()) | |
.pipe(throughParallel.obj({concurrency:2}, (url, enc, done) => { // 동시 실행 제한을 지정할 수 있음. | |
if(!url) return done(); | |
request.head(url, (err, response) => { | |
push(url + ' is ' + (err? 'down' : 'up') + '\n'); | |
done(); | |
}); | |
})) | |
.pipe(fs.createWriteStream('results.txt')) | |
.on('finish', () => console.log('All urls were checked')); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
const fs = require('fs'); | |
const path = require('path'); | |
const mkdirp = require('mkdirp'); | |
class ToFileStream extends stream.Writable { | |
constructor() { | |
super({objectMode: true}); | |
} | |
_write(chunk, encoding, callback) { | |
mkdirp(path.dirname(chunk.path), err => { | |
if(err) { | |
return callback(err); | |
} | |
fs.writeFile(chunk.path, chunk.content, callback); | |
}); | |
} | |
} | |
module.exports = ToFileStream; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment