Last active
May 13, 2020 17:48
-
-
Save isaacs/7d84e1b1ad88068a3f1f16292ec4d931 to your computer and use it in GitHub Desktop.
tracking down the root cause of https://github.com/npm/npm-registry-fetch/issues/23, fixed by https://github.com/isaacs/minipass/commit/4c5a106ed34bef4da06083466a476e16c39f4a20
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Exhaustively try to break a tee stream usage akin to the weird | |
// make-fetch-happen/cacache interaction behind | |
// https://github.com/npm/npm-registry-fetch/issues/23 | |
// make-fetch-happen commit 7f896bef4bbcab73eccb92b5999204b673cdf485 | |
// | |
// "cache" is a mock akin to cacache.put.stream() | |
// It is a pipeline containing: | |
// - a cache writing stream `cache` | |
// A minipass-flush with an internal writer `cacheContent`, that it proxies | |
// writes to, and drain events from. The first time the content is written | |
// to cacheContent, a handleContentP promise is set, which resolves a tick | |
// after the internal stream ends. The `cache` flush() method returns this | |
// handleContentP promise. | |
// - a Flush stream with a flush() method that resolves a tick later, to create | |
// the cache key | |
// | |
// "newBody" is a Pipeline containing: | |
// - "oldBody" a Minipass stream (the source of data) | |
// - "tee" a Minipass stream an extra pipe destination: | |
// - `cache` stream, set to resolve the cacheWritePromise on cache.promise() | |
// - a Flush stream that returns cacheWritePromise in its flush() method | |
// | |
// When `tee` pipes to cache, it gets the data chunks out of order. When `tee` | |
// just writes on data and ends on end, without backpressure, it works fine. | |
// | |
// Options: | |
// - oldBody has 1 chunk, or multiple chunks | |
// - oldBody is fully available, has 1 chunk available, or 0 chunks available, | |
// when the pipeline starts | |
// - newBody begins consuming immediately, or later | |
// | |
// newBody is *always* consumed completely without backpressure, because it's | |
// read via response.json() | |
const Minipass = require('minipass') | |
const Flush = require('minipass-flush') | |
const Pipeline = require('minipass-pipeline') | |
const chunkCount = 5 | |
const chunks = new Array(chunkCount) | |
for (let i = 0; i < chunkCount; i++) { | |
// cycle through the alphabet | |
chunks[i] = Buffer.from([97 + i % 26]) | |
} | |
const MISSING = Symbol('missing') | |
const setTimeoutOrNow = (fn, to = MISSING) => { | |
if (to === MISSING) | |
throw new Error('missing timeout value') | |
if (to === null) | |
fn() | |
else | |
setTimeout(fn, to) | |
} | |
// simulate writing to a tmp file, which takes some time | |
class TmpFileWriter extends Minipass { | |
constructor () { | |
super() | |
//this.resume() | |
} | |
write (chunk) { | |
// console.error('TMP FILE WRITE') | |
// XXX will this emit drain properly? | |
const ret = super.write(chunk) | |
setTimeoutOrNow(() => this.read(), timeouts.tmpFile.write || 0) | |
return ret | |
} | |
//end () { | |
// // console.error('TMP FILE END')//, new Error('trace').stack) | |
// super.end() | |
// //setTimeoutOrNow(() => this.read(), timeouts.tmpFile.end || 0) | |
// return this | |
//} | |
} | |
class MockCache extends Flush { | |
constructor () { | |
// console.error('CREATE MOCK CACHE') | |
super() | |
this.inputStream = new TmpFileWriter() | |
this.inputStream.on('error', er => this.emit('error', er)) | |
this.inputStream.on('drain', () => this.emit('drain')) | |
this.handleContentP = null | |
this.dataSeen = [] | |
} | |
write (chunk) { | |
this.dataSeen.push(chunk.toString()) | |
// console.error('MOCK CACHE WRITE', this.handleContentP, this.foo) | |
this.foo = this.foo || 'FOO ' + Date.now() | |
if (!this.handleContentP) { | |
// console.error('SET UP HANDLE CONTENT P') | |
this.handleContentP = this.inputStream.promise() | |
//this.handleContentP = new Promise(res => { | |
// this.inputStream.promise().then(() => setTimeoutOrNow(res, timeouts.cache.inputStream)) | |
//}) | |
} | |
// console.error('WRITE TO INPUT STREAM', chunk.toString()) | |
return this.inputStream.write(chunk) | |
} | |
flush (cb) { | |
// console.error('FLUSH CACHE STREAM', this.dataSeen) | |
if (!this.handleContentP) { | |
this.emit('error', new Error('no data to cache')) | |
} else { | |
this.inputStream.end() | |
this.handleContentP.then(() => cb(), cb) | |
} | |
// console.error('MOCK CACHE FLUSH') | |
//this.inputStream.end().promise().then(() => { | |
// if (!this.handleContentP) | |
// this.emit('error', new Error('no data to cache')) | |
// this.handleContentP.then(() => cb(), cb) | |
//}) | |
} | |
} | |
const cachePutStream = () => { | |
const p = new (class CachePutStream extends Pipeline {})() | |
p.push(new MockCache()) | |
p.push(new Flush({ | |
flush () { | |
return new Promise(res => setTimeoutOrNow(res, timeouts.cache.putFlush)) | |
} | |
})) | |
return p | |
} | |
const newBodyStream = oldBody => { | |
let cacheWriteResolve, cacheWriteReject | |
const cacheWritePromise = new Promise((resolve, reject) => { | |
cacheWriteResolve = resolve | |
cacheWriteReject = reject | |
}) | |
const newBodyWritten = [] | |
const newBodyEmitted = [] | |
const newBody = new (class NewBody extends Pipeline { | |
emit (ev, ...args) { | |
if (ev === 'data') { | |
// console.error('NEW BODY EMITTED', args[0].toString()) | |
} | |
return super.emit(ev, ...args) | |
} | |
write (chunk, encoding, cb) { | |
// console.error('NEW BODY WRITTEN', chunk.toString()) | |
return super.write(chunk, encoding, cb) | |
} | |
})(new Flush({ | |
flush () { | |
return cacheWritePromise | |
} | |
})) | |
const tee = new class Tee extends Minipass { | |
constructor () { | |
super() | |
this.dataSeen = [] | |
} | |
emit (ev, ...args) { | |
if (ev === 'data') { | |
// console.error('TEE EMIT DATA', args[0].toString()) | |
} | |
return super.emit(ev, ...args) | |
} | |
write (chunk, encoding, cb) { | |
this.dataSeen.push(chunk.toString()) | |
return super.write(chunk, encoding, cb) | |
} | |
end (...args) { | |
// console.error('TEE DATA SEEN', this.dataSeen) | |
return super.end(...args) | |
} | |
} | |
const cacheStream = cachePutStream() | |
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject) | |
// console.error('PIPE TEE TO CACHE STREAM>') | |
tee.pipe(cacheStream) | |
// console.error('PIPE TEE TO CACHE STREAM<') | |
// console.error('ADD TEE TO PIPELINE>') | |
newBody.unshift(tee) | |
// console.error('ADD TEE TO PIPELINE<') | |
// console.error('ADD OLD BODY TO PIPELINE>') | |
newBody.unshift(oldBody) | |
// console.error('ADD OLD BODY TO PIPELINE<') | |
return Promise.resolve(newBody) | |
} | |
const runTest = async () => { | |
const source = new Minipass() | |
// note: "later" here might be null, so it'll be buffered up to start | |
{ | |
let i = 0 | |
const writeLater = () => { | |
source.write(chunks[i++]) | |
if (i < chunkCount) | |
setTimeoutOrNow(writeLater, timeouts.source.write) | |
else | |
setTimeoutOrNow(() => source.end(), timeouts.source.end) | |
} | |
setTimeoutOrNow(writeLater, timeouts.source.write) | |
} | |
const newBody = await newBodyStream(source) | |
return newBody.concat() | |
} | |
const assert = require('assert') | |
const t = require('tap') | |
const toValues = [null, 0, 10, 100] | |
const timeouts = { | |
tmpFile: { | |
write: 0, | |
end: 0, | |
}, | |
cache: { | |
inputStream: 0, | |
putFlush: 0, | |
}, | |
source: { | |
write: 0, | |
end: 0, | |
} | |
} | |
for (const tmpFileWrite of toValues) t.test(`timeouts.tmpFile.write=${tmpFileWrite}`, async t => { | |
for (const tmpFileEnd of toValues) t.test(`timeouts.tmpFile.end=${tmpFileEnd}`, async t => { | |
for (const cacheInputStream of toValues) t.test(`timeouts.cache.inputStream=${cacheInputStream}`, async t => { | |
for (const cachePutFlush of toValues) t.test(`timeouts.cache.putFlush=${cachePutFlush}`, async t => { | |
for (const sourceWrite of toValues) t.test(`timeouts.source.write=${sourceWrite}`, async t => { | |
for (const sourceEnd of toValues) t.test(`timeouts.source.end=${sourceEnd}`, async t => { | |
timeouts.tmpFile.write = tmpFileWrite | |
timeouts.tmpFile.end = tmpFileEnd | |
timeouts.cache.inputStream = cacheInputStream | |
timeouts.cache.putFlush = cachePutFlush | |
timeouts.source.write = sourceWrite | |
timeouts.source.end = sourceEnd | |
return runTest().then(actual => { | |
const expect = Buffer.concat(chunks).toString() | |
t.equal(actual.toString(), expect, JSON.stringify(timeouts)) | |
}) | |
}) | |
}) | |
}) | |
}) | |
}) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment