Skip to content

Instantly share code, notes, and snippets.

@stuartwakefield
Last active June 15, 2016 08:01
Show Gist options
  • Save stuartwakefield/8f8ee6b519503f546b45cea87c6f2257 to your computer and use it in GitHub Desktop.
Save stuartwakefield/8f8ee6b519503f546b45cea87c6f2257 to your computer and use it in GitHub Desktop.
Using a HTTP request as a stream
const request = http.request(opts)
// We expect a single response event to be emitted when the response
// to the request is received.
const response$ = _('response', request).head()
// Create an error stream from the error events from the request.
// As this is an event stream it is considered an infinite stream.
const error$ = _('error', request).map(error => { throw error })
// As such we need to know when the error stream is considered
// ended. We expect a single end event to be emitted from the
// response and will use this to end the error stream.
const end$ = response$.observe()
.flatMap(response => _('end', response))
.head()
.done(_ => error$.end())
// As the response itself is a readable stream we simply flatMap
// it through Highland.js to retrieve the data buffers
const buffer$ = response$.flatMap(_)
// To parse a JSON request
const data$ = buffer$
.map(buffer => buffer.toString('utf-8'))
// BEWARE: all buffers will be collected up in memory... This
// works for a generic JSON response, if you are parsing
// extremely long JSON feeds of items and are memory constrained,
// you could instead create or use a custom parser to emit
// elements as they are encountered.
.collect()
.map(parts => parts.join(''))
.map(JSON.parse)
// The parsed data and any errors as a single stream
const result$ = _([ data$, error$ ]).merge()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment