Skip to content

Instantly share code, notes, and snippets.

@Sawtaytoes
Last active May 2, 2019 15:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sawtaytoes/4896e9ae6091392bebfc6ba027e27bd0 to your computer and use it in GitHub Desktop.
Save Sawtaytoes/4896e9ae6091392bebfc6ba027e27bd0 to your computer and use it in GitHub Desktop.
`createTransformStreamSubject` implements a clean subject-like interface around a Node.js transform stream.
const { fromEvent, Subject } = require('rxjs')
const { take } = require('rxjs/operators')
const { Transform } = require('stream')
const createTransformStreamSubject = (
nodeJsStream,
) => {
const chunk$ = new Subject()
const push$ = new Subject()
const transformStream$ = new Subject()
transformStream$
._next = (
transformStream$
.next
.bind(transformStream$)
)
const transformStream = (
new Transform({
readableObjectMode: true,
transform(
chunk,
encoding,
callback,
) {
chunk$
.pipe(
take(1),
)
.subscribe(callback)
transformStream$
._next(chunk)
},
writableObjectMode: true,
})
)
push$
.subscribe(value => {
transformStream
.push(value)
})
const transformedStream = (
nodeJsStream
.pipe(transformStream)
)
transformStream$
.stream = transformedStream
fromEvent(
transformedStream,
'finish',
)
.subscribe(() => {
chunk$
.complete()
transformStream$
.complete()
push$
.complete()
})
transformStream$
.push = (
value,
) => {
push$
.next(value)
}
transformStream$
.next = () => {
chunk$
.next()
}
return transformStream$
}
module.exports = createTransformStreamSubject
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment