Skip to content

Instantly share code, notes, and snippets.

@reidab
Forked from max-mapper/download.js
Last active February 4, 2017 04:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reidab/3facd4241b62bb35f3e58737f596c52c to your computer and use it in GitHub Desktop.
Save reidab/3facd4241b62bb35f3e58737f596c52c to your computer and use it in GitHub Desktop.
bulk GET header requester/download scripts
var crypto = require('crypto')
var fs = require('fs')
var ndjson = require('ndjson')
var request = require('request')
var transform = require('unordered-parallel-transform')
var through = require('through2')
var mkdirp = require('mkdirp')
var blobStore = require('content-addressable-blob-store')
var pump = require('pump')
var dataDir = process.argv[2]
var excludeFile = process.argv[3]
if (!dataDir) throw new Error('must specify data directory')
var blobs = blobStore(dataDir)
var PARALLEL = 2048
var RETRIES = 10
var seen = {}
var agentOptions = {
keepAlive: true,
maxSockets: 10,
maxFreeSockets: 10,
timeout: 60000,
keepAliveMsecs: 30000,
rejectUnauthorized: false // lots of gov sites have bad self signed certs
}
var reqHeaders = {"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36"}
if (excludeFile) {
var buffer = through()
var excludes = {}
var excludeCount = 0
process.stdin
.pipe(buffer)
fs.createReadStream(excludeFile)
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
if (!obj.file) return next()
blobs.exists({key: obj.file}, function (err, exists) {
if (err || !exists) return next()
var hash = crypto.createHash('sha256').update(obj.url).digest('hex')
if (!excludes[hash]) {
excludes[hash] = true
excludeCount++
}
next()
})
}, function end () {
console.error(JSON.stringify({excludeCount: excludeCount}))
buffer
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
var hash = crypto.createHash('sha256').update(obj.url).digest('hex')
if (excludes[hash]) {
console.error(JSON.stringify({skipping: obj.url}))
return next()
}
this.push(obj)
next()
}))
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
}))
} else {
process.stdin
.pipe(ndjson.parse())
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
}
function getResponse (item, cb) {
var tries = 0
tryDownload()
var retryTime = 0
function tryDownload (err) {
if (tries >= RETRIES) {
var msg = 'Max retries exceeded'
if (err) msg += ': ' + err.message
return error(new Error(msg))
}
tries++
var stats = {url: item.url, try: tries}
if (err) stats.error = err
console.error(JSON.stringify(stats))
var start = Date.now()
setTimeout(function () {
retryTime = 5000 // after first try
try {
var r = request(item.url, {agentOptions: agentOptions, headers: reqHeaders})
} catch (e) {
e.errType = 'requestInitError'
return error(e)
}
r.on('error', function (err) {
err.errType = 'reqStreamErr'
tryDownload(err)
})
r.on('response', function (re) {
var elapsed = Date.now() - start
var meta = {url: item.url, date: new Date(), headersTook: elapsed, package_id: item.package_id, id: item.id, status: re.statusCode, rawHeaders: re.rawHeaders, headers: re.headers}
var ws = blobs.createWriteStream()
pump(re, ws, function (err) {
if (err) {
err.errType = 'streamPumpErr'
return tryDownload(err)
}
meta.downloadTook = Date.now() - start
meta.file = ws.key
cb(null, meta)
})
})
}, retryTime)
function error (err) {
var obj = {url: item.url, date: new Date(), package_id: item.package_id, id: item.id, error: err}
if (err.code === 'ETIMEDOUT') {
obj.timeout = true
}
cb(null, obj)
}
}
}
var url = require('url')
var crypto = require('crypto')
var fs = require('fs')
var ndjson = require('ndjson')
var transform = require('parallel-transform')
var through = require('through2')
var JSFtp = require("jsftp")
var blobStore = require('content-addressable-blob-store')
var dataDir = process.argv[2]
if (!dataDir) throw new Error('must specify data directory')
var blobs = blobStore(dataDir)
var PARALLEL = 1000
var seen = {}
function getResponse (item, cb) {
var start = Date.now()
var parsed = url.parse(item.url)
var ftp = new JSFtp({host: parsed.hostname})
ftp.get(parsed.path, function (err, socket) {
if (err) return error(err)
var ws = blobs.createWriteStream()
pump(socket, ws, function (err) {
if (err) {
err.errType = 'streamPumpErr'
return error(err)
}
var end = Date.now()
var elapsed = end - start
var meta = {url: item.url, date: new Date(), took: elapsed, package_id: item.package_id, id: item.id}
meta.file = ws.key
cb(null, meta)
})
})
function error (err) {
var obj = {url: item.url, date: new Date(), package_id: item.package_id, id: item.id, error: error}
cb(null, obj)
}
}
process.stdin
.pipe(ndjson.parse())
.pipe(through.obj(function (item, enc, next) {
if (item.url.slice(0, 4) !== 'ftp:') return next()
var hash = crypto.createHash('sha256').update(item.url).digest('hex')
if (seen[hash]) return
self.push(item)
seen[hash] = true
next()
}))
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
var crypto = require('crypto')
var fs = require('fs')
var ndjson = require('ndjson')
var request = require('request')
var transform = require('parallel-transform')
var through = require('through2')
var PARALLEL = 1000
var seen = {}
function getResponse (item, cb) {
try {
var r = request(item.url, {time: true, timeout: 10000})
} catch (e) {
return error(e)
}
r.on('error', function (err) {
error(err)
})
r.on('response', function (re) {
cb(null, {
redirects: r._redirect && r._redirect.redirects,
initialUrl: item.url,
url: r.uri.href,
date: new Date(),
took: r.elapsedTime,
package_id: item.package_id,
id: item.id,
status: re.statusCode,
headers: re.headers,
})
r.abort()
})
function error (err) {
var obj = {url: item.url, date: new Date(), package_id: item.package_id, id: item.id, error: error.message}
if (err.code === 'ETIMEDOUT') {
obj.timeout = true
}
cb(null, obj)
}
}
fs.createReadStream('./meta.json')
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
var self = this
if (obj.resources) {
obj.resources.forEach(function (r) {
var hash = crypto.createHash('sha256').update(r.url).digest('hex')
if (seen[hash]) return
self.push(r)
seen[hash] = true
})
}
next()
}))
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
{
"name": "example",
"version": "1.0.0",
"description": "",
"main": "dl.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"content-addressable-blob-store": "^4.3.0",
"mkdirp": "^0.5.1",
"ndjson": "^1.5.0",
"jsftp": "^1.5.5",
"parallel-transform": "^1.1.0",
"unordered-parallel-transform": "^1.0.0",
"pump": "^1.0.2",
"request": "^2.79.0",
"through2": "^2.0.3"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment