Last active
July 7, 2019 05:10
-
-
Save shobhitg/ca3b5cb30279f91454835e155eab453d to your computer and use it in GitHub Desktop.
RxJS fromStream Adapter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default function fromStream<T = Uint8Array>( | |
stream: Readable, | |
finishEventName = "end", | |
dataEventName = "data" | |
) { | |
stream.pause(); | |
const headerSize = 24; | |
const recordSize = 52; | |
let gotHeader: boolean = false; | |
let backStream: Buffer = Buffer.from([]); | |
return new Observable<Buffer>(observer => { | |
function dataHandler(data: Uint8Array) { | |
let size = gotHeader ? recordSize : headerSize; | |
backStream = Buffer.concat([backStream, data]); | |
while (backStream.length >= size) { | |
observer.next(backStream.subarray(0, size)); | |
backStream = backStream.subarray(size); | |
if (!gotHeader) { | |
gotHeader = true; | |
size = recordSize; | |
} | |
} | |
} | |
function errorHandler(err: Error) { | |
observer.error(err); | |
} | |
function endHandler() { | |
observer.complete(); | |
} | |
stream.addListener(dataEventName, dataHandler); | |
stream.addListener("error", errorHandler); | |
stream.addListener(finishEventName, endHandler); | |
stream.resume(); | |
return () => { | |
stream.removeListener(dataEventName, dataHandler); | |
stream.removeListener("error", errorHandler); | |
stream.removeListener(finishEventName, endHandler); | |
}; | |
}).pipe(share()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from "rxjs"; | |
import { share } from "rxjs/operators"; | |
import { Readable } from "stream"; | |
// Borrowed this excellent function from https://stackoverflow.com/a/42174935 | |
// This function fromStream() should have been provided by default in RxJS library, | |
// but for some reason its not included. | |
export default function fromStream<T = Uint8Array>( | |
stream: Readable, | |
finishEventName = "end", | |
dataEventName = "data" | |
) { | |
stream.pause(); | |
return new Observable<number>(observer => { | |
function dataHandler(data: Uint8Array) { | |
// make sure we send data one byte at a time so its easier to handle buffering with byte counts | |
data.forEach(byte => observer.next(byte)); | |
} | |
function errorHandler(err: Error) { | |
observer.error(err); | |
} | |
function endHandler() { | |
observer.complete(); | |
} | |
stream.addListener(dataEventName, dataHandler); | |
stream.addListener("error", errorHandler); | |
stream.addListener(finishEventName, endHandler); | |
stream.resume(); | |
return () => { | |
stream.removeListener(dataEventName, dataHandler); | |
stream.removeListener("error", errorHandler); | |
stream.removeListener(finishEventName, endHandler); | |
}; | |
}).pipe(share()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from "rxjs"; | |
import { share } from "rxjs/operators"; | |
import { Readable } from "stream"; | |
import { Buffer } from "buffer"; | |
// Borrowed this excellent function from https://stackoverflow.com/a/42174935 | |
// This function fromStream() should have been provided by default in RxJS library, | |
// but for some reason its not included. | |
export function fromStream<T = Uint8Array>( | |
stream: Readable, | |
finishEventName = "end", | |
dataEventName = "data" | |
) { | |
stream.pause(); | |
return new Observable<Uint8Array>(observer => { | |
function dataHandler(data: Uint8Array) { | |
observer.next(data); | |
} | |
function errorHandler(err: Error) { | |
observer.error(err); | |
} | |
function endHandler() { | |
observer.complete(); | |
} | |
stream.addListener(dataEventName, dataHandler); | |
stream.addListener("error", errorHandler); | |
stream.addListener(finishEventName, endHandler); | |
stream.resume(); | |
return () => { | |
stream.removeListener(dataEventName, dataHandler); | |
stream.removeListener("error", errorHandler); | |
stream.removeListener(finishEventName, endHandler); | |
}; | |
}).pipe(share()); | |
} | |
export const toRecordStream = ( | |
stream: Observable<Uint8Array>, | |
recordSize: number, | |
headerSize?: number | |
) => { | |
let backStream: Buffer = Buffer.of(); | |
let gotHeader = headerSize == null ? true : false; | |
return new Observable<Buffer>(observer => { | |
stream.subscribe(data => { | |
let size = gotHeader || headerSize == null ? recordSize : headerSize; | |
backStream = Buffer.concat([backStream, data]); | |
while (backStream.length >= size) { | |
observer.next(backStream.subarray(0, size)); | |
backStream = backStream.subarray(size); | |
if (!gotHeader) { | |
gotHeader = true; | |
size = recordSize; | |
} | |
} | |
}); | |
}).pipe(share()); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment