Skip to content

Instantly share code, notes, and snippets.

@mattfysh
Last active August 10, 2021 23:42
Show Gist options
  • Save mattfysh/694a2209f1cc3f7bff132cd1e40452a6 to your computer and use it in GitHub Desktop.
Save mattfysh/694a2209f1cc3f7bff132cd1e40452a6 to your computer and use it in GitHub Desktop.
Streaming WARC (using HTTPS lib)
const https = require('https')
const { PassThrough } = require('stream')
class WarcAgent extends https.Agent {
createConnection(...args) {
const socket = super.createConnection(...args)
const request = new PassThrough()
const response = new PassThrough()
// request stream via monkey-patching
const origWrite = socket.write
socket.write = (chunk, enc, cb) => {
request.write(chunk, enc)
return origWrite.call(socket, chunk, enc, cb)
}
const origEnd = socket.end
socket.end = (chunk, enc, cb) => {
request.write(chunk, enc)
return origEnd.call(socket, chunk, enc, cb)
}
// response stream
socket.pipe(response)
// attach
socket.rawStrems = { request, response }
}
}
exports.globalWarcAgent = new WarcAgent({
keepAlive: true,
keepAliveMsecs: 5000,
})
const https = require('https')
const { PassThrough, pipeline } = require('stream')
const { createGzip } = require('zlib')
const { createWarcRecordTransform } = require('./transform')
// This is the stream that collects WARC records as they're generated.
// Typically a consumer would import the `recordStream` and attach it
// to a file write stream created with `fs.createWriteStream`
exports.recordStream = new PassThrough()
const pipeRecord = (body, record) => {
const concat = new PassThrough()
concat.cork()
pipeline(body, record, createGzip(), concat, plcb).pipe(recordStream, { end: false })
}
exports.fetch = (url, opts) =>
new Promise((resolve, reject) => {
let rawStreams
const requestRecord = createWarcRecordTransform('request', url)
const responseRecord = createWarcRecordTransform('response', url)
const reqOpts = {
method: opts.method,
agent: globalHttpsAgent,
}
const req = https.request(url, reqOpts, res => {
let text = ''
res.on('data', chunk => (text += chunk))
res.on('end', () => {
rawStreams.response.unpipe(responseRecord)
resolve(fakeFetchRes(text))
})
})
req.on('socket', socket => {
rawStreams = socket.rawStreams
pipeRecord(rawStreams.request, requestRecord)
pipeRecord(rawStreams.response, responseRecord)
})
req.on('error', reject)
req.end(() => rawStreams.request.unpipe(requestRecord))
})
const { Transform } = require('stream')
const { v4: uuid } = require('uuid')
const serializeNamedFields = fields =>
Object.entries(fields).map(
([key, value]) => `${key}: ${value}`
).join('\r\n')
const createWarcRecord = (body, meta = {}) => {
const standard = {
'WARC-Record-ID': `<urn:uuid:${uuid()}>`,
'WARC-Date': new Date().toISOString(),
'Content-Length': Buffer.byteLength(body),
}
const headerObject = Object.assign({}, standard, meta)
const headerEntries = serializeNamedFields(headerObject)
return ['WARC/1.1', headerEntries, '', body, '', ''].join('\r\n')
}
exports.createWarcRecordTransform = (type, url) =>
new Transform({
construct(callback) {
this._body = ''
// emit record once unpiped
this.on('unpipe', () => this.end())
callback()
},
transform(chunk, encoding, callback) {
this._body += chunk
callback()
},
flush(callback) {
try {
const record = createWarcRecord(this._body, {
'WARC-Type': type,
'WARC-Target-URI': url,
})
this.push(record)
this.push(null)
} catch(err) {
callback(err)
}
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment