Created
March 21, 2018 20:46
-
-
Save Mr0grog/2499576e144ec7e0e9256737d2f65a6e to your computer and use it in GitHub Desktop.
Quick-n-dirty script to roll through all the versions stored in `web-monitoring-db`, load their raw content, remove any unintentionally captured Versionista styling data, and validate their hashes. This makes some assumptions about how/where raw content is stored on S3.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const crypto = require('crypto'); | |
const Iconv = require('iconv').Iconv; | |
const parallel = require('parallel-transform'); | |
const pump = require('pump'); | |
const request = require('request'); | |
const stream = require('stream'); | |
const S3 = require('aws-sdk/clients/s3'); | |
const DEBUG = process.env['DEBUG']; | |
const startChunk = Number(process.env['START_CHUNK']) || 1; | |
const maximumVersionCount = Number(process.env['LIMIT_VERSIONS']) || 100000; | |
const s3 = new S3({ | |
accessKeyId: process.env['AWS_S3_KEY'], | |
secretAccessKey: process.env['AWS_S3_SECRET'] | |
}); | |
const dbCredentials = { | |
user: process.env['WEB_MONITORING_DB_EMAIL'], | |
pass: process.env['WEB_MONITORING_DB_PASSWORD'] | |
}; | |
const dbUrl = 'https://api.monitoring.envirodatagov.org/' | |
// --------------------- Main Program -------------------------------- | |
let totalBadHashes = 0; | |
let totalBadBodies = 0; | |
pump( | |
streamQuery('api/v0/versions', { | |
chunk_size: 1000, | |
source_type: 'versionista', | |
sort: 'created_at:asc', | |
chunk: startChunk | |
}, maximumVersionCount), | |
// Load the corresponding raw content from S3. | |
parallel(10, (version, callback) => { | |
if (!version.uri) { | |
return callback(); | |
} | |
const parts = version.uri.match(AWS_URL_EXPRESSION); | |
if (!parts) { | |
return callback(new Error(`Unparseable URL: "${version.uri}"`)); | |
} | |
s3.getObject({ | |
Bucket: parts[1], | |
Key: parts[2] | |
}, (error, response) => { | |
if (error) { | |
console.error(`Error getting ${version.uri}`); | |
return callback(error); | |
} | |
if (!Buffer.isBuffer(response.Body)) { | |
callback(new Error(`Got a non-buffer body! ${typeof response.Body} / ${response.Body.constructor.name}`)); | |
return; | |
} | |
callback(null, { | |
version, | |
s3: { | |
Bucket: parts[1], | |
ACL: 'public-read', | |
ContentType: response.ContentType, | |
Key: parts[2], | |
Body: response.Body | |
} | |
}); | |
}); | |
}), | |
// check body for versionista content | |
mapStream(data => { | |
const {version, s3} = data; | |
const result = transformEncodedText( | |
getEncoding(s3), | |
s3.Body, | |
text => text.replace(versionistaSourceAdditionsPattern, '') | |
); | |
if (!result.equals(s3.Body)) { | |
data.newText = s3.Body = result; | |
totalBadBodies += 1; | |
} | |
return data; | |
}), | |
// Validate/recalculate hash | |
mapStream(data => { | |
const {version, s3} = data; | |
const hash = crypto.createHash('sha256').update(s3.Body).digest('hex'); | |
if (hash !== version.version_hash) { | |
console.log(`Wrong hash: ${version.uri} - ${version.version_hash} vs. ${hash}`); | |
data.newHash = hash; | |
totalBadHashes += 1; | |
} | |
return data; | |
}), | |
// Re-upload cleaned-up raw content. | |
parallel(10, (data, callback) => { | |
if (data.newText) { | |
console.error('REUPLOADING to S3'); | |
s3.upload(data.s3, (error, result) => callback(error, data)); | |
} | |
else { | |
callback(null, data); | |
} | |
}), | |
// Limit to items that need to modified in the DB (hash changes) | |
filterStream(data => data.newHash), | |
// Get URL for page so we can send bulk version updates :( | |
parallel(10, (data, callback) => { | |
getPageUrl(data.version.page_uuid, (error, url) => { | |
data.version.page_url = url; | |
callback(error, data); | |
}); | |
}), | |
// Format for bulk re-importing. | |
mapStream(data => ({ | |
page_url: data.version.page_url, | |
source_type: data.version.source_type, | |
capture_time: data.version.capture_time, | |
version_hash: data.newHash, | |
// created_at will get disregarded by the importer, but keep it for logging | |
created_at: data.version.created_at | |
})), | |
batchedStream(250), | |
parallel(1, (newVersions, callback) => { | |
if (!newVersions.length) { | |
return callback(); | |
} | |
importIntoDb(newVersions) | |
.then(importData => { | |
console.error(`Import ${importData.id} complete`); | |
const lastVersion = newVersions[newVersions.length - 1]; | |
console.error(` Up to created_at: ${lastVersion.created_at}`); | |
callback(); | |
}) | |
.catch(callback); | |
}), | |
error => { | |
console.error(`${totalBadBodies} fixed content`); | |
console.error(`${totalBadHashes} fixed hashes`); | |
if (error) { | |
console.error('ERROR:', error); | |
return; | |
} | |
console.error('All complete!'); | |
} | |
); | |
// --------------------- Helpers -------------------------------- | |
const AWS_URL_EXPRESSION = /^https:\/\/([^.]+)\.s3\.amazonaws\.com\/(.+)$/i; | |
const META_TAG_EXPRESSION = /<meta[^>]+charset\s*=\s*['"]?([^>]*?)[ \/;'">]'/i; | |
// Matches an XML prolog that specifies character encoding: | |
// <?xml version="1.0" encoding="ISO-8859-1"?> | |
const XML_PROLOG_EXPRESSION = /<\?xml\s[^>]*encoding=['"]([^'"]+)['"].*\?>/i; | |
const versionistaSourceAdditionsPattern = | |
/\n?<!--\s*Versionista general\s*-->[^]*?<!--\s*End Versionista general\s*-->\n?/i; | |
function getEncoding(response) { | |
let encoding = null; | |
if (response.ContentType.includes('charset=')) { | |
return response.ContentType.split('charset=')[1]; | |
} | |
const utfBody = response.Body.toString('utf8').slice(0, 2048); | |
const metaMatch = utfBody.match(META_TAG_EXPRESSION); | |
if (metaMatch) { | |
return metaMatch[1]; | |
} | |
const prologMatch = utfBody.match(XML_PROLOG_EXPRESSION); | |
if (prologMatch) { | |
return prologMatch[1]; | |
} | |
return 'utf8'; | |
} | |
const UTF8_ENCODING_EXPRESSION = /^utf\-?8$/i; | |
function transformEncodedText (encoding, rawText, transform) { | |
const isUtf8 = UTF8_ENCODING_EXPRESSION.test(encoding); | |
let text = ''; | |
if (isUtf8) { | |
text = rawText.toString('utf8'); | |
} | |
else { | |
text = (new Iconv(encoding, 'UTF-8')).convert(rawText).toString('utf8'); | |
} | |
const newText = transform(text); | |
let encoded = rawText; | |
if (newText !== text) { | |
if (isUtf8) { | |
encoded = Buffer.from(newText); | |
} | |
else { | |
encoded = (new Iconv('UTF-8', encoding)).convert(newText); | |
} | |
} | |
return encoded; | |
} | |
// ------------------ API Tools ---------------------------------------- | |
function apiRequest (apiPath, options = {}) { | |
return new Promise((resolve, reject) => { | |
let url = /^http(s?):\/\//.test(apiPath) ? apiPath : `${dbUrl}${apiPath}`; | |
const requestOptions = Object.assign({}, options, { | |
url, | |
auth: dbCredentials, | |
callback (error, response) { | |
if (error) { | |
return reject(error); | |
} | |
let body; | |
try { | |
body = JSON.parse(response.body); | |
} | |
catch (error) { | |
return reject(new Error( | |
`Could not parse response for ${currentUrl}\n\n${response.body}`)); | |
} | |
if (response.statusCode >= 400 || body.error || body.errors) { | |
if (body.error) { | |
reject(new Error(body.error.title || body.error.message || body.error)); | |
} | |
else if (body.errors) { | |
reject(new Error(body.errors[0].title)); | |
} | |
else { | |
reject(new Error(`Unknown Error: ${response.body}`)); | |
} | |
return; | |
} | |
resolve(body); | |
} | |
}); | |
if (DEBUG) { | |
const query = requestOptions.qs ? ` ? ${JSON.stringify(requestOptions.qs)}` : ''; | |
console.error(`API: ${requestOptions.method || GET} ${url}${query}`); | |
} | |
request(requestOptions); | |
}); | |
} | |
let pageUrls = new Map(); | |
function getPageUrl (pageUuid, callback) { | |
const cached = pageUrls.get(pageUuid); | |
if (cached) { | |
return callback(null, cached); | |
} | |
apiRequest(`api/v0/pages/${pageUuid}`) | |
.then(body => { | |
pageUrls.set(pageUuid, body.data.url); | |
callback(null, body.data.url); | |
}) | |
.catch(callback); | |
} | |
// Stream a DB Query | |
function streamQuery (apiPath, qs = {}, limit = 0) { | |
let remaining = limit > 0 ? limit : Infinity; | |
let currentUrl = apiPath; | |
let requestOptions = {qs}; | |
function read () { | |
const query = requestOptions.qs ? ` ? ${JSON.stringify(requestOptions.qs)}` : ''; | |
console.error(`Streaming chunk: ${currentUrl}${query}`); | |
return apiRequest(currentUrl, requestOptions) | |
.then(body => { | |
// Links in the response have the correct query options pre-embedded; | |
// clear them out for subsequent requests. | |
requestOptions = {}; | |
remaining -= body.data.length; | |
currentUrl = body.links.next; | |
if (remaining <= 0 || !currentUrl) { | |
body.data = body.data.slice(0, body.data.length + remaining); | |
body.data.push(null); | |
} | |
return body.data; | |
}); | |
} | |
return readableBatchedSourceStream( | |
read, | |
{highWaterMark: Math.min(qs.chunk_size || 100, 1000)} | |
); | |
} | |
function importIntoDb (versions) { | |
return apiRequest(`api/v0/imports`, { | |
method: 'POST', | |
headers: {'Content-Type': 'application/x-json-stream'}, | |
body: versions.map(v => JSON.stringify(v)).join('\n'), | |
qs: {update: 'merge'} | |
}) | |
.then(body => { | |
const importId = body.data.id; | |
const waitForComplete = () => | |
apiRequest(`api/v0/imports/${importId}`) | |
.then(body => (body.data.status === 'complete' ? body : false)); | |
return pollEvery(1000, waitForComplete) | |
.then(body => { | |
body.data.processed_versions = versions.length; | |
if (body.data.processing_errors.length > 0) { | |
throw new Error(`Processing errors: ${body.data.processing_errors.join(', ')}`); | |
} | |
return body.data; | |
}); | |
}); | |
} | |
// ------------------ Rob's Random Promise Helpers --------------------- | |
function delayPromise (delay) { | |
return new Promise(resolve => setTimeout(resolve, delay)); | |
} | |
function pollEvery (interval, pollingFunction) { | |
function poll () { | |
return delayPromise(interval) | |
.then(pollingFunction) | |
.then(result => { | |
if (result) { | |
return result; | |
} | |
return poll(); | |
}); | |
} | |
return poll(); | |
} | |
// ------------------ Rob's Random Stream Helpers ---------------------- | |
function mapStream (mapper) { | |
return stream.Transform({ | |
objectMode: true, | |
transform (data, encoding, callback) { | |
try { | |
const result = mapper(data); | |
if (result != null) { | |
this.push(result); | |
} | |
callback(); | |
} | |
catch (error) { | |
return callback(error); | |
} | |
} | |
}); | |
} | |
function filterStream (predicate) { | |
return stream.Transform({ | |
objectMode: true, | |
transform (data, encoding, callback) { | |
try { | |
if (predicate(data)) { | |
this.push(data); | |
} | |
callback(); | |
} | |
catch (error) { | |
return callback(error); | |
} | |
} | |
}); | |
} | |
function batchedStream (chunkSize = Infinity) { | |
if (chunkSize < 1 || typeof chunkSize !== 'number') { | |
chunkSize = 1; | |
} | |
return stream.Transform({ | |
objectMode: true, | |
transform (data, encoding, callback) { | |
if (!this._chunkBuffer) { | |
this._chunkBuffer = []; | |
} | |
this._chunkBuffer.push(data); | |
if (this._chunkBuffer.length === chunkSize) { | |
this.push(this._chunkBuffer); | |
this._chunkBuffer = []; | |
} | |
callback(); | |
}, | |
flush (callback) { | |
if (this._chunkBuffer && this._chunkBuffer.length) { | |
this.push(this._chunkBuffer); | |
} | |
this._chunkBuffer = null; | |
callback(); | |
} | |
}); | |
} | |
function limitStream (limit = Infinity) { | |
if (limit < 1 || typeof limit !== 'number') { | |
limit = 1; | |
} | |
let remaining = limit; | |
return stream.Transform({ | |
objectMode: true, | |
transform (data, encoding, callback) { | |
if (remaining > 0) { | |
this.push(data); | |
remaining--; | |
} | |
callback(); | |
} | |
}); | |
} | |
function readableBatchedSourceStream (readBatch, options = {}) { | |
options = Object.assign({ | |
endOnError: true, | |
highWaterMark: 16, | |
objectMode: true | |
}, options); | |
let loading = false; | |
return new stream.Readable({ | |
highWaterMark: options.highWaterMark, | |
objectMode: options.objectMode, | |
read (size) { | |
if (loading) return; | |
loading = true; | |
readBatch(size) | |
.then(batch => { | |
for (let item of batch) { | |
this.push(item); | |
} | |
loading = false; | |
}) | |
.catch(error => { | |
this.emit('error', error); | |
if (options.endOnError) { | |
this.push(null); | |
} | |
}); | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment