Skip to content

Instantly share code, notes, and snippets.

@adrianhopebailie
Created January 11, 2019 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adrianhopebailie/acd1fcb864fa7b3a34e594ed3671896d to your computer and use it in GitHub Desktop.
Save adrianhopebailie/acd1fcb864fa7b3a34e594ed3671896d to your computer and use it in GitHub Desktop.
A nice framework for a generic requestor that puts the requests with a correlation id onto an underlying duplex and gets responses off the same stream
import { Duplex } from 'stream'
export class Requestor {
private _map = new Map<number, (reply: string) => void>()
private _nextCorrelationId = 1
private _stream: Duplex
constructor (stream: Duplex) {
this._stream = stream
this._stream.on('data', (chunk: any) => {
const callback = this._map.get(chunk.id)
if (callback) {
callback(chunk.message)
} else {
throw new Error(`unsolicited messaged received with id: ${chunk.id}`)
}
})
}
public request (message: string, timeout: number): Promise<string> {
const id = this._nextCorrelationId++
return new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
this._map.delete(id)
reject(new Error('timed out'))
}, timeout)
this._map.set(id, (reply: string) => {
this._map.delete(id)
clearTimeout(timer)
resolve(reply)
})
this._stream.write({ id, message })
})
}
}
// For testing
export class MirrorStream extends Duplex {
constructor () {
super({ objectMode: true })
}
_write (chunk: any, encoding: string, callback: (error?: Error) => void): void {
try {
callback()
this.push(chunk)
} catch (e) {
callback(e)
}
}
_read (size: number): void {
// NO OP
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment