Skip to content

Instantly share code, notes, and snippets.

@Mr0grog
Created March 21, 2018 20:46
Show Gist options
  • Save Mr0grog/2499576e144ec7e0e9256737d2f65a6e to your computer and use it in GitHub Desktop.
Save Mr0grog/2499576e144ec7e0e9256737d2f65a6e to your computer and use it in GitHub Desktop.
Quick-n-dirty script to roll through all the versions stored in `web-monitoring-db`, load their raw content, remove any unintentionally captured Versionista styling data, and validate their hashes. This makes some assumptions about how/where raw content is stored on S3.
'use strict';
const crypto = require('crypto');
const Iconv = require('iconv').Iconv;
const parallel = require('parallel-transform');
const pump = require('pump');
const request = require('request');
const stream = require('stream');
const S3 = require('aws-sdk/clients/s3');
const DEBUG = process.env['DEBUG'];
const startChunk = Number(process.env['START_CHUNK']) || 1;
const maximumVersionCount = Number(process.env['LIMIT_VERSIONS']) || 100000;
const s3 = new S3({
accessKeyId: process.env['AWS_S3_KEY'],
secretAccessKey: process.env['AWS_S3_SECRET']
});
const dbCredentials = {
user: process.env['WEB_MONITORING_DB_EMAIL'],
pass: process.env['WEB_MONITORING_DB_PASSWORD']
};
const dbUrl = 'https://api.monitoring.envirodatagov.org/'
// --------------------- Main Program --------------------------------
let totalBadHashes = 0;
let totalBadBodies = 0;
pump(
streamQuery('api/v0/versions', {
chunk_size: 1000,
source_type: 'versionista',
sort: 'created_at:asc',
chunk: startChunk
}, maximumVersionCount),
// Load the corresponding raw content from S3.
parallel(10, (version, callback) => {
if (!version.uri) {
return callback();
}
const parts = version.uri.match(AWS_URL_EXPRESSION);
if (!parts) {
return callback(new Error(`Unparseable URL: "${version.uri}"`));
}
s3.getObject({
Bucket: parts[1],
Key: parts[2]
}, (error, response) => {
if (error) {
console.error(`Error getting ${version.uri}`);
return callback(error);
}
if (!Buffer.isBuffer(response.Body)) {
callback(new Error(`Got a non-buffer body! ${typeof response.Body} / ${response.Body.constructor.name}`));
return;
}
callback(null, {
version,
s3: {
Bucket: parts[1],
ACL: 'public-read',
ContentType: response.ContentType,
Key: parts[2],
Body: response.Body
}
});
});
}),
// check body for versionista content
mapStream(data => {
const {version, s3} = data;
const result = transformEncodedText(
getEncoding(s3),
s3.Body,
text => text.replace(versionistaSourceAdditionsPattern, '')
);
if (!result.equals(s3.Body)) {
data.newText = s3.Body = result;
totalBadBodies += 1;
}
return data;
}),
// Validate/recalculate hash
mapStream(data => {
const {version, s3} = data;
const hash = crypto.createHash('sha256').update(s3.Body).digest('hex');
if (hash !== version.version_hash) {
console.log(`Wrong hash: ${version.uri} - ${version.version_hash} vs. ${hash}`);
data.newHash = hash;
totalBadHashes += 1;
}
return data;
}),
// Re-upload cleaned-up raw content.
parallel(10, (data, callback) => {
if (data.newText) {
console.error('REUPLOADING to S3');
s3.upload(data.s3, (error, result) => callback(error, data));
}
else {
callback(null, data);
}
}),
// Limit to items that need to modified in the DB (hash changes)
filterStream(data => data.newHash),
// Get URL for page so we can send bulk version updates :(
parallel(10, (data, callback) => {
getPageUrl(data.version.page_uuid, (error, url) => {
data.version.page_url = url;
callback(error, data);
});
}),
// Format for bulk re-importing.
mapStream(data => ({
page_url: data.version.page_url,
source_type: data.version.source_type,
capture_time: data.version.capture_time,
version_hash: data.newHash,
// created_at will get disregarded by the importer, but keep it for logging
created_at: data.version.created_at
})),
batchedStream(250),
parallel(1, (newVersions, callback) => {
if (!newVersions.length) {
return callback();
}
importIntoDb(newVersions)
.then(importData => {
console.error(`Import ${importData.id} complete`);
const lastVersion = newVersions[newVersions.length - 1];
console.error(` Up to created_at: ${lastVersion.created_at}`);
callback();
})
.catch(callback);
}),
error => {
console.error(`${totalBadBodies} fixed content`);
console.error(`${totalBadHashes} fixed hashes`);
if (error) {
console.error('ERROR:', error);
return;
}
console.error('All complete!');
}
);
// --------------------- Helpers --------------------------------
const AWS_URL_EXPRESSION = /^https:\/\/([^.]+)\.s3\.amazonaws\.com\/(.+)$/i;
const META_TAG_EXPRESSION = /<meta[^>]+charset\s*=\s*['"]?([^>]*?)[ \/;'">]'/i;
// Matches an XML prolog that specifies character encoding:
// <?xml version="1.0" encoding="ISO-8859-1"?>
const XML_PROLOG_EXPRESSION = /<\?xml\s[^>]*encoding=['"]([^'"]+)['"].*\?>/i;
const versionistaSourceAdditionsPattern =
/\n?<!--\s*Versionista general\s*-->[^]*?<!--\s*End Versionista general\s*-->\n?/i;
function getEncoding(response) {
let encoding = null;
if (response.ContentType.includes('charset=')) {
return response.ContentType.split('charset=')[1];
}
const utfBody = response.Body.toString('utf8').slice(0, 2048);
const metaMatch = utfBody.match(META_TAG_EXPRESSION);
if (metaMatch) {
return metaMatch[1];
}
const prologMatch = utfBody.match(XML_PROLOG_EXPRESSION);
if (prologMatch) {
return prologMatch[1];
}
return 'utf8';
}
const UTF8_ENCODING_EXPRESSION = /^utf\-?8$/i;
function transformEncodedText (encoding, rawText, transform) {
const isUtf8 = UTF8_ENCODING_EXPRESSION.test(encoding);
let text = '';
if (isUtf8) {
text = rawText.toString('utf8');
}
else {
text = (new Iconv(encoding, 'UTF-8')).convert(rawText).toString('utf8');
}
const newText = transform(text);
let encoded = rawText;
if (newText !== text) {
if (isUtf8) {
encoded = Buffer.from(newText);
}
else {
encoded = (new Iconv('UTF-8', encoding)).convert(newText);
}
}
return encoded;
}
// ------------------ API Tools ----------------------------------------
function apiRequest (apiPath, options = {}) {
return new Promise((resolve, reject) => {
let url = /^http(s?):\/\//.test(apiPath) ? apiPath : `${dbUrl}${apiPath}`;
const requestOptions = Object.assign({}, options, {
url,
auth: dbCredentials,
callback (error, response) {
if (error) {
return reject(error);
}
let body;
try {
body = JSON.parse(response.body);
}
catch (error) {
return reject(new Error(
`Could not parse response for ${currentUrl}\n\n${response.body}`));
}
if (response.statusCode >= 400 || body.error || body.errors) {
if (body.error) {
reject(new Error(body.error.title || body.error.message || body.error));
}
else if (body.errors) {
reject(new Error(body.errors[0].title));
}
else {
reject(new Error(`Unknown Error: ${response.body}`));
}
return;
}
resolve(body);
}
});
if (DEBUG) {
const query = requestOptions.qs ? ` ? ${JSON.stringify(requestOptions.qs)}` : '';
console.error(`API: ${requestOptions.method || GET} ${url}${query}`);
}
request(requestOptions);
});
}
let pageUrls = new Map();
function getPageUrl (pageUuid, callback) {
const cached = pageUrls.get(pageUuid);
if (cached) {
return callback(null, cached);
}
apiRequest(`api/v0/pages/${pageUuid}`)
.then(body => {
pageUrls.set(pageUuid, body.data.url);
callback(null, body.data.url);
})
.catch(callback);
}
// Stream a DB Query
function streamQuery (apiPath, qs = {}, limit = 0) {
let remaining = limit > 0 ? limit : Infinity;
let currentUrl = apiPath;
let requestOptions = {qs};
function read () {
const query = requestOptions.qs ? ` ? ${JSON.stringify(requestOptions.qs)}` : '';
console.error(`Streaming chunk: ${currentUrl}${query}`);
return apiRequest(currentUrl, requestOptions)
.then(body => {
// Links in the response have the correct query options pre-embedded;
// clear them out for subsequent requests.
requestOptions = {};
remaining -= body.data.length;
currentUrl = body.links.next;
if (remaining <= 0 || !currentUrl) {
body.data = body.data.slice(0, body.data.length + remaining);
body.data.push(null);
}
return body.data;
});
}
return readableBatchedSourceStream(
read,
{highWaterMark: Math.min(qs.chunk_size || 100, 1000)}
);
}
function importIntoDb (versions) {
return apiRequest(`api/v0/imports`, {
method: 'POST',
headers: {'Content-Type': 'application/x-json-stream'},
body: versions.map(v => JSON.stringify(v)).join('\n'),
qs: {update: 'merge'}
})
.then(body => {
const importId = body.data.id;
const waitForComplete = () =>
apiRequest(`api/v0/imports/${importId}`)
.then(body => (body.data.status === 'complete' ? body : false));
return pollEvery(1000, waitForComplete)
.then(body => {
body.data.processed_versions = versions.length;
if (body.data.processing_errors.length > 0) {
throw new Error(`Processing errors: ${body.data.processing_errors.join(', ')}`);
}
return body.data;
});
});
}
// ------------------ Rob's Random Promise Helpers ---------------------
function delayPromise (delay) {
return new Promise(resolve => setTimeout(resolve, delay));
}
function pollEvery (interval, pollingFunction) {
function poll () {
return delayPromise(interval)
.then(pollingFunction)
.then(result => {
if (result) {
return result;
}
return poll();
});
}
return poll();
}
// ------------------ Rob's Random Stream Helpers ----------------------
function mapStream (mapper) {
return stream.Transform({
objectMode: true,
transform (data, encoding, callback) {
try {
const result = mapper(data);
if (result != null) {
this.push(result);
}
callback();
}
catch (error) {
return callback(error);
}
}
});
}
function filterStream (predicate) {
return stream.Transform({
objectMode: true,
transform (data, encoding, callback) {
try {
if (predicate(data)) {
this.push(data);
}
callback();
}
catch (error) {
return callback(error);
}
}
});
}
function batchedStream (chunkSize = Infinity) {
if (chunkSize < 1 || typeof chunkSize !== 'number') {
chunkSize = 1;
}
return stream.Transform({
objectMode: true,
transform (data, encoding, callback) {
if (!this._chunkBuffer) {
this._chunkBuffer = [];
}
this._chunkBuffer.push(data);
if (this._chunkBuffer.length === chunkSize) {
this.push(this._chunkBuffer);
this._chunkBuffer = [];
}
callback();
},
flush (callback) {
if (this._chunkBuffer && this._chunkBuffer.length) {
this.push(this._chunkBuffer);
}
this._chunkBuffer = null;
callback();
}
});
}
function limitStream (limit = Infinity) {
if (limit < 1 || typeof limit !== 'number') {
limit = 1;
}
let remaining = limit;
return stream.Transform({
objectMode: true,
transform (data, encoding, callback) {
if (remaining > 0) {
this.push(data);
remaining--;
}
callback();
}
});
}
function readableBatchedSourceStream (readBatch, options = {}) {
options = Object.assign({
endOnError: true,
highWaterMark: 16,
objectMode: true
}, options);
let loading = false;
return new stream.Readable({
highWaterMark: options.highWaterMark,
objectMode: options.objectMode,
read (size) {
if (loading) return;
loading = true;
readBatch(size)
.then(batch => {
for (let item of batch) {
this.push(item);
}
loading = false;
})
.catch(error => {
this.emit('error', error);
if (options.endOnError) {
this.push(null);
}
});
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment