Skip to content

Instantly share code, notes, and snippets.

@max-mapper
Last active March 7, 2017 22:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save max-mapper/7f09ff2dbb061a2517cd678341b50eaf to your computer and use it in GitHub Desktop.
Save max-mapper/7f09ff2dbb061a2517cd678341b50eaf to your computer and use it in GitHub Desktop.
bulk GET header requester/download scripts
var crypto = require('crypto')
var fs = require('fs')
var ndjson = require('ndjson')
var request = require('request')
var transform = require('unordered-parallel-transform')
var through = require('through2')
var mkdirp = require('mkdirp')
var blobStore = require('content-addressable-blob-store')
var pump = require('pump')
var dataDir = process.argv[2]
var excludeFile = process.argv[3]
if (!dataDir) throw new Error('must specify data directory')
var blobs = blobStore(dataDir)
var PARALLEL = 2048
var RETRIES = 10
var seen = {}
var agentOptions = {
keepAlive: true,
maxSockets: 10,
maxFreeSockets: 10,
timeout: 60000,
keepAliveMsecs: 30000,
rejectUnauthorized: false // lots of gov sites have bad self signed certs
}
var reqHeaders = {"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36"}
if (excludeFile) {
var buffer = through()
var excludes = {}
var excludeCount = 0
process.stdin
.pipe(buffer)
fs.createReadStream(excludeFile)
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
if (!obj.file) return next()
blobs.exists({key: obj.file}, function (err, exists) {
if (err || !exists) return next()
var hash = crypto.createHash('sha256').update(obj.url).digest('hex')
if (!excludes[hash]) {
excludes[hash] = true
excludeCount++
}
next()
})
}, function end () {
console.error(JSON.stringify({excludeCount: excludeCount}))
buffer
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
var hash = crypto.createHash('sha256').update(obj.url).digest('hex')
if (excludes[hash]) {
console.error(JSON.stringify({skipping: obj.url}))
return next()
}
this.push(obj)
next()
}))
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
}))
} else {
process.stdin
.pipe(ndjson.parse())
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
}
function getResponse (item, cb) {
var tries = 0
tryDownload()
var retryTime = 0
function tryDownload (err) {
if (tries >= RETRIES) {
var msg = 'Max retries exceeded'
if (err) msg += ': ' + err.message
return error(new Error(msg))
}
tries++
var stats = {url: item.url, try: tries}
if (err) stats.error = err
console.error(JSON.stringify(stats))
var start = Date.now()
setTimeout(function () {
retryTime = 5000 // after first try
try {
var r = request(item.url, {agentOptions: agentOptions, headers: reqHeaders})
} catch (e) {
e.errType = 'requestInitError'
return error(e)
}
r.on('error', function (err) {
err.errType = 'reqStreamErr'
tryDownload(err)
})
r.on('response', function (re) {
var elapsed = Date.now() - start
var meta = {url: item.url, date: new Date(), headersTook: elapsed, package_id: item.package_id, id: item.id, status: re.statusCode, rawHeaders: re.rawHeaders, headers: re.headers}
var ws = blobs.createWriteStream()
pump(re, ws, function (err) {
if (err) {
err.errType = 'streamPumpErr'
return tryDownload(err)
}
meta.downloadTook = Date.now() - start
meta.file = ws.key
cb(null, meta)
})
})
}, retryTime)
function error (err) {
var obj = {url: item.url, date: new Date(), package_id: item.package_id, id: item.id, error: err}
if (err.code === 'ETIMEDOUT') {
obj.timeout = true
}
cb(null, obj)
}
}
}
var url = require('url')
var crypto = require('crypto')
var fs = require('fs')
var ndjson = require('ndjson')
var transform = require('parallel-transform')
var through = require('through2')
var JSFtp = require("jsftp")
var blobStore = require('content-addressable-blob-store')
var dataDir = process.argv[2]
if (!dataDir) throw new Error('must specify data directory')
var blobs = blobStore(dataDir)
var PARALLEL = 1000
var seen = {}
function getResponse (item, cb) {
var start = Date.now()
var parsed = url.parse(item.url)
var ftp = new JSFtp({host: parsed.hostname})
ftp.get(parsed.path, function (err, socket) {
if (err) return error(err)
var ws = blobs.createWriteStream()
pump(socket, ws, function (err) {
if (err) {
err.errType = 'streamPumpErr'
return error(err)
}
var end = Date.now()
var elapsed = end - start
var meta = {url: item.url, date: new Date(), took: elapsed, package_id: item.package_id, id: item.id}
meta.file = ws.key
cb(null, meta)
})
})
function error (err) {
var obj = {url: item.url, date: new Date(), package_id: item.package_id, id: item.id, error: error}
cb(null, obj)
}
}
process.stdin
.pipe(ndjson.parse())
.pipe(through.obj(function (item, enc, next) {
if (item.url.slice(0, 4) !== 'ftp:') return next()
var hash = crypto.createHash('sha256').update(item.url).digest('hex')
if (seen[hash]) return
self.push(item)
seen[hash] = true
next()
}))
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
var crypto = require('crypto')
var fs = require('fs')
var ndjson = require('ndjson')
var request = require('request')
var transform = require('parallel-transform')
var through = require('through2')
var PARALLEL = 1000
var seen = {}
function getResponse (item, cb) {
try {
var r = request(item.url, {time: true, timeout: 10000})
} catch (e) {
return error(e)
}
r.on('error', function (err) {
error(err)
})
r.on('response', function (re) {
cb(null, {
redirects: r._redirect && r._redirect.redirects,
initialUrl: item.url,
url: r.uri.href,
date: new Date(),
took: r.elapsedTime,
package_id: item.package_id,
id: item.id,
status: re.statusCode,
headers: re.headers,
})
r.abort()
})
function error (err) {
var obj = {url: item.url, date: new Date(), package_id: item.package_id, id: item.id, error: error.message}
if (err.code === 'ETIMEDOUT') {
obj.timeout = true
}
cb(null, obj)
}
}
fs.createReadStream('./meta.json')
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
var self = this
if (obj.resources) {
obj.resources.forEach(function (r) {
var hash = crypto.createHash('sha256').update(r.url).digest('hex')
if (seen[hash]) return
self.push(r)
seen[hash] = true
})
}
next()
}))
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
{
"name": "example",
"version": "1.0.0",
"description": "",
"main": "dl.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"content-addressable-blob-store": "^4.3.0",
"mkdirp": "^0.5.1",
"ndjson": "^1.5.0",
"jsftp": "^1.5.5",
"parallel-transform": "^1.1.0",
"unordered-parallel-transform": "^1.0.0",
"pump": "^1.0.2",
"request": "^2.79.0",
"through2": "^2.0.3"
}
}
@trygve-lie
Copy link

Should do it.

If those are against the same host try to tune the agent on request. Its tuning but ex the following tend to take average requests from ~30ms to ~2ms.

const agent = new http.Agent({
    keepAlive: true,
    maxSockets: 10,
    maxFreeSockets: 10,
    timeout: 60000,
    keepAliveMsecs: 30000
});

http.request({
    method : 'GET',
    host : '127.0.0.1',
    port : 8000,
    path : '/',
    agent: agent
});

@reidab
Copy link

reidab commented Feb 4, 2017

Updated headers.js on my fork to use request.elapsedTime and to track additional info for redirects.

@max-mapper
Copy link
Author

@reidab i merged in your headers changes, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment