Skip to content

Instantly share code, notes, and snippets.

@isaacs
Last active May 13, 2020 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save isaacs/7d84e1b1ad88068a3f1f16292ec4d931 to your computer and use it in GitHub Desktop.
Save isaacs/7d84e1b1ad88068a3f1f16292ec4d931 to your computer and use it in GitHub Desktop.
// Exhaustively try to break a tee stream usage akin to the weird
// make-fetch-happen/cacache interaction behind
// https://github.com/npm/npm-registry-fetch/issues/23
// make-fetch-happen commit 7f896bef4bbcab73eccb92b5999204b673cdf485
//
// "cache" is a mock akin to cacache.put.stream()
// It is a pipeline containing:
// - a cache writing stream `cache`
// A minipass-flush with an internal writer `cacheContent`, that it proxies
// writes to, and drain events from. The first time the content is written
// to cacheContent, a handleContentP promise is set, which resolves a tick
// after the internal stream ends. The `cache` flush() method returns this
// handleContentP promise.
// - a Flush stream with a flush() method that resolves a tick later, to create
// the cache key
//
// "newBody" is a Pipeline containing:
// - "oldBody" a Minipass stream (the source of data)
// - "tee" a Minipass stream an extra pipe destination:
// - `cache` stream, set to resolve the cacheWritePromise on cache.promise()
// - a Flush stream that returns cacheWritePromise in its flush() method
//
// When `tee` pipes to cache, it gets the data chunks out of order. When `tee`
// just writes on data and ends on end, without backpressure, it works fine.
//
// Options:
// - oldBody has 1 chunk, or multiple chunks
// - oldBody is fully available, has 1 chunk available, or 0 chunks available,
// when the pipeline starts
// - newBody begins consuming immediately, or later
//
// newBody is *always* consumed completely without backpressure, because it's
// read via response.json()
const Minipass = require('minipass')
const Flush = require('minipass-flush')
const Pipeline = require('minipass-pipeline')
const chunkCount = 5
const chunks = new Array(chunkCount)
for (let i = 0; i < chunkCount; i++) {
// cycle through the alphabet
chunks[i] = Buffer.from([97 + i % 26])
}
const MISSING = Symbol('missing')
const setTimeoutOrNow = (fn, to = MISSING) => {
if (to === MISSING)
throw new Error('missing timeout value')
if (to === null)
fn()
else
setTimeout(fn, to)
}
// simulate writing to a tmp file, which takes some time
class TmpFileWriter extends Minipass {
constructor () {
super()
//this.resume()
}
write (chunk) {
// console.error('TMP FILE WRITE')
// XXX will this emit drain properly?
const ret = super.write(chunk)
setTimeoutOrNow(() => this.read(), timeouts.tmpFile.write || 0)
return ret
}
//end () {
// // console.error('TMP FILE END')//, new Error('trace').stack)
// super.end()
// //setTimeoutOrNow(() => this.read(), timeouts.tmpFile.end || 0)
// return this
//}
}
class MockCache extends Flush {
constructor () {
// console.error('CREATE MOCK CACHE')
super()
this.inputStream = new TmpFileWriter()
this.inputStream.on('error', er => this.emit('error', er))
this.inputStream.on('drain', () => this.emit('drain'))
this.handleContentP = null
this.dataSeen = []
}
write (chunk) {
this.dataSeen.push(chunk.toString())
// console.error('MOCK CACHE WRITE', this.handleContentP, this.foo)
this.foo = this.foo || 'FOO ' + Date.now()
if (!this.handleContentP) {
// console.error('SET UP HANDLE CONTENT P')
this.handleContentP = this.inputStream.promise()
//this.handleContentP = new Promise(res => {
// this.inputStream.promise().then(() => setTimeoutOrNow(res, timeouts.cache.inputStream))
//})
}
// console.error('WRITE TO INPUT STREAM', chunk.toString())
return this.inputStream.write(chunk)
}
flush (cb) {
// console.error('FLUSH CACHE STREAM', this.dataSeen)
if (!this.handleContentP) {
this.emit('error', new Error('no data to cache'))
} else {
this.inputStream.end()
this.handleContentP.then(() => cb(), cb)
}
// console.error('MOCK CACHE FLUSH')
//this.inputStream.end().promise().then(() => {
// if (!this.handleContentP)
// this.emit('error', new Error('no data to cache'))
// this.handleContentP.then(() => cb(), cb)
//})
}
}
const cachePutStream = () => {
const p = new (class CachePutStream extends Pipeline {})()
p.push(new MockCache())
p.push(new Flush({
flush () {
return new Promise(res => setTimeoutOrNow(res, timeouts.cache.putFlush))
}
}))
return p
}
const newBodyStream = oldBody => {
let cacheWriteResolve, cacheWriteReject
const cacheWritePromise = new Promise((resolve, reject) => {
cacheWriteResolve = resolve
cacheWriteReject = reject
})
const newBodyWritten = []
const newBodyEmitted = []
const newBody = new (class NewBody extends Pipeline {
emit (ev, ...args) {
if (ev === 'data') {
// console.error('NEW BODY EMITTED', args[0].toString())
}
return super.emit(ev, ...args)
}
write (chunk, encoding, cb) {
// console.error('NEW BODY WRITTEN', chunk.toString())
return super.write(chunk, encoding, cb)
}
})(new Flush({
flush () {
return cacheWritePromise
}
}))
const tee = new class Tee extends Minipass {
constructor () {
super()
this.dataSeen = []
}
emit (ev, ...args) {
if (ev === 'data') {
// console.error('TEE EMIT DATA', args[0].toString())
}
return super.emit(ev, ...args)
}
write (chunk, encoding, cb) {
this.dataSeen.push(chunk.toString())
return super.write(chunk, encoding, cb)
}
end (...args) {
// console.error('TEE DATA SEEN', this.dataSeen)
return super.end(...args)
}
}
const cacheStream = cachePutStream()
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject)
// console.error('PIPE TEE TO CACHE STREAM>')
tee.pipe(cacheStream)
// console.error('PIPE TEE TO CACHE STREAM<')
// console.error('ADD TEE TO PIPELINE>')
newBody.unshift(tee)
// console.error('ADD TEE TO PIPELINE<')
// console.error('ADD OLD BODY TO PIPELINE>')
newBody.unshift(oldBody)
// console.error('ADD OLD BODY TO PIPELINE<')
return Promise.resolve(newBody)
}
const runTest = async () => {
const source = new Minipass()
// note: "later" here might be null, so it'll be buffered up to start
{
let i = 0
const writeLater = () => {
source.write(chunks[i++])
if (i < chunkCount)
setTimeoutOrNow(writeLater, timeouts.source.write)
else
setTimeoutOrNow(() => source.end(), timeouts.source.end)
}
setTimeoutOrNow(writeLater, timeouts.source.write)
}
const newBody = await newBodyStream(source)
return newBody.concat()
}
const assert = require('assert')
const t = require('tap')
const toValues = [null, 0, 10, 100]
const timeouts = {
tmpFile: {
write: 0,
end: 0,
},
cache: {
inputStream: 0,
putFlush: 0,
},
source: {
write: 0,
end: 0,
}
}
for (const tmpFileWrite of toValues) t.test(`timeouts.tmpFile.write=${tmpFileWrite}`, async t => {
for (const tmpFileEnd of toValues) t.test(`timeouts.tmpFile.end=${tmpFileEnd}`, async t => {
for (const cacheInputStream of toValues) t.test(`timeouts.cache.inputStream=${cacheInputStream}`, async t => {
for (const cachePutFlush of toValues) t.test(`timeouts.cache.putFlush=${cachePutFlush}`, async t => {
for (const sourceWrite of toValues) t.test(`timeouts.source.write=${sourceWrite}`, async t => {
for (const sourceEnd of toValues) t.test(`timeouts.source.end=${sourceEnd}`, async t => {
timeouts.tmpFile.write = tmpFileWrite
timeouts.tmpFile.end = tmpFileEnd
timeouts.cache.inputStream = cacheInputStream
timeouts.cache.putFlush = cachePutFlush
timeouts.source.write = sourceWrite
timeouts.source.end = sourceEnd
return runTest().then(actual => {
const expect = Buffer.concat(chunks).toString()
t.equal(actual.toString(), expect, JSON.stringify(timeouts))
})
})
})
})
})
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment