Skip to content

Instantly share code, notes, and snippets.

@avinoamr
Created April 6, 2016 14:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save avinoamr/2531e25ee14ab30f8686e9a3ed4e305e to your computer and use it in GitHub Desktop.
Save avinoamr/2531e25ee14ab30f8686e9a3ed4e305e to your computer and use it in GitHub Desktop.
const AWS_KEY = '';
const AWS_SECRET = '';
const S3_BUCKET = '';
const DATA_JSON_URL = 'https://data.cityofnewyork.us/api/views/jb7j-dtam/rows.json?accessType=DOWNLOAD';
const REDSHIFT_CLUSTERS = {
'2x.dc1.large': {} // host, user, password, database, port
'2x.dc1.8xlarge': {},
'2x.ds2.xlarge': {},
'2x.ds2.8xlarge': {},
}
const VARIATIONS = {
format: [ 'avro', 'csv', 'json', 'fixedwidth' ],
compression: [ true, false ],
files: [ 1, 10 ],
cols: [ 5, 20 ],
rows: [ 1000000, 10000000 ],
cluster: [ '2x.dc1.large', '2x.dc1.8xlarge', '2x.ds2.xlarge', '2x.ds2.8xlarge' ],
}
// install build-essential & git
var pg = require( 'pg' )
var fs = require( 'fs' );
var path = require( 'path' );
var util = require( 'util' );
var zlib = require( 'zlib' );
var aws = require( 'aws-sdk' );
var bytes = require( 'bytes' );
var Table = require('cli-table');
var extend = require( 'extend' );
var stream = require( 'stream' );
var request = require( 'request' );
var Promise = require( 'bluebird' );
var avro = require( 'node-avro-io' ); // npm install jamesbrucepower/node-avro-io
var debug = require( 'debug' )( 'redshift-bench' );
var s3 = new aws.S3({
accessKeyId: AWS_KEY,
secretAccessKey: AWS_SECRET,
signatureVersion: "v4",
});
s3 = Promise.promisifyAll( s3 );
pg = Promise.promisifyAll( pg );
request = Promise.promisifyAll( request );
function runTest ( test ) {
test.serialize_time = '';
test.compress_time = '';
test.upload_time = '';
test.copy_time = '';
test.datasize = 0;
return Promise.bind({
test: test
})
.then( function () {
return createData( test )
})
.then( function ( data ) {
this.data = data;
this.columns = Object.keys( data[ 0 ] ).map( function ( columnName, i ) {
columnName = new String( columnName );
columnName.size = data.columnSize[ i ] + 1;
return columnName;
})
return pg.connectAsync( REDSHIFT_CLUSTERS[ test.cluster ] )
})
.then( function ( client, done ) {
debug( 'Connected to %s', test.cluster );
this.done = client.end.bind( client );
this.query = function ( sql ) {
debug( 'SQL %s', sql );
return client.queryAsync.apply( client, arguments );
}
})
.then( function () {
return this.query( 'DROP TABLE IF EXISTS bench' );
})
.then( function () {
return this.query([
'CREATE TABLE bench (',
this.columns.map( function ( name ) {
return '"' + name + '" varchar';
}).join( ',' ),
')'
].join( ' ' ) )
})
.then( serialize )
.then( compress )
.each( function ( file ) {
test.datasize += fs.lstatSync( file ).size
})
.tap( function () {
test.datasize = bytes( test.datasize );
})
.then( upload )
.then( copy )
.then( function () {
console.log( '' );
debug( 'Test done', test );
this.done();
})
}
function serialize () {
var test = this.test;
var columns = this.columns;
var data = this.data;
var rowsPerFile = Math.floor( test.rows / test.files );
var t = timer();
return Promise.resolve( createArray( test.files ) )
.tap( function () {
t.reset();
})
.map( function ( i ) {
var name = [ 'data', i, test.format, 'raw' ].join( '.' );
var path = '/tmp/' + name;
debug( 'Serializing %s rows to %s', rowsPerFile, path );
return new Promise( function ( resolve, reject ) {
new DataStream( rowsPerFile, columns, data )
.once( 'error', reject )
.pipe( DataStream.toFormat( test.format, columns, test ) )
.once( 'error', reject )
.pipe( fs.createWriteStream( path ) )
.once( 'error', reject )
.once( 'finish', function () {
process.nextTick( resolve.bind( null, path ) )
})
})
})
.tap( function () {
test.serialize_time = t.elapsed();
})
}
function compress ( files ) {
var test = this.test;
if ( test.compression !== true ) {
test.compress_time = 0;
return files;
}
var t = timer();
return Promise.resolve( files )
.tap( function () {
t.reset();
})
.map( function ( source ) {
var target = source.split( '.' )
.slice( 0, -1 )
.concat( 'gz' )
.join( '.' );
debug( 'Compressing %s to %s', source, target );
return new Promise( function ( resolve, reject ) {
fs.createReadStream( source )
.once( 'error', reject )
.pipe( zlib.createGzip() )
.once( 'error', reject )
.pipe( fs.createWriteStream( target ) )
.once( 'error', reject )
.once( 'finish', function () {
process.nextTick( resolve.bind( null, target ) );
})
})
})
.tap( function () {
test.compress_time = t.elapsed();
})
}
function upload ( files ) {
var test = this.test;
var t = timer();
return Promise.resolve( files )
.tap( function () {
t.reset();
})
.map( function ( file ) {
var key = path.basename( file );
debug( 'Uploading %s to %s', file, key );
return s3.uploadAsync({
Bucket: S3_BUCKET,
Key: key,
Body: fs.createReadStream( file )
})
.return( key )
})
.then( function ( keys ) {
debug( 'Upload done.' );
if ( keys.length === 1 ) {
return keys[ 0 ] // don't create manifest on a single file
}
var key = 'data.manifest';
var entries = keys.map( function ( key ) {
return {
url: "s3://" + S3_BUCKET + "/" + key,
mandatory: true
}
})
debug( 'Uploading manifest %s', key );
return s3.uploadAsync({
Bucket: S3_BUCKET,
Key: key,
Body: JSON.stringify({
entries: entries
})
})
.return( key )
})
.tap( function () {
test.upload_time = t.elapsed();
})
}
function copy ( key ) {
debug( 'Copying %s', key );
var test = this.test;
var creds = [
'aws_access_key_id=' + s3.config.accessKeyId,
'aws_secret_access_key=' + s3.config.secretAccessKey
].join( ";" );
var isManifest = ( path.extname( key ) === '.manifest' );
var format = test.format.toUpperCase();
if ( format === 'FIXEDWIDTH' ) {
format += ' \'' + this.columns.map( function ( col ) {
return col + ':' + col.size
}).join( ',' ) + '\''
} else if ( format !== 'CSV' ) {
format += ' AS \'auto\'';
}
var t = timer();
return Promise.bind( this )
.tap( function () {
t.reset();
})
.then( function () {
return this.query([
'COPY bench FROM', '\'s3://' + S3_BUCKET + '/' + key + '\'',
'WITH CREDENTIALS', "'" + creds + "'",
'FORMAT AS', format,
isManifest ? 'MANIFEST' : '',
test.compression === true ? 'GZIP' : '',
'STATUPDATE OFF', // disable implicit analyze on temp tables
'COMPUPDATE OFF', // disable auto compression on temp tables
].join( ' ' ) )
})
.tap( function () {
test.copy_time = t.elapsed();
})
}
util.inherits( DataStream, stream.Readable );
function DataStream ( rows, columns, data ) {
stream.Readable.call( this, { objectMode: true });
this.rows = rows;
this.columns = columns;
this.data = data;
this.count = 0;
}
DataStream.prototype._read = function () {
if ( this.count >= this.rows ) {
this.push( null );
return
}
if ( this.count % 10000 === 0 ) {
debug( 'DataStream emitted %s / %s', this.count, this.rows );
}
var row = this.data[ this.count % this.data.length ];
this.count += 1;
this.push( row );
}
DataStream.toFormat = function ( format, columns, test ) {
return ({
csv: toCSV,
json: toJSON,
avro: toAVRO,
fixedwidth: toFixedWidth
})[ format ]( columns, test )
}
function toFixedWidth ( columns ) {
var count = 0;
var transform = new stream.Transform({
writableObjectMode: true
})
transform._transform = function ( row, encoding, done ) {
row = columns.map( function ( column ) {
var v = row[ column ];
if ( typeof v === 'string' ) {
v = v.replace( /[\n\'\"\t\r\{\}\,]/g, '' ) || ' ';
}
v = v.toString();
while ( v.length < column.size ) {
v += ' ';
}
return v;
}).join( '' );
done( null, count++ === 0 ? row : '\n' + row )
}
return transform;
}
function toCSV ( columns ) {
var count = 0;
var transform = new stream.Transform({
writableObjectMode: true
})
transform._transform = function ( row, encoding, done ) {
row = columns.map( function ( column ) {
var v = row[ column ];
if ( typeof v === 'string' ) {
v = v.replace( /[\n\'\"\t\r\{\}\,]/g, '' ) || ' ';
}
return v;
}).join( ',' );
done( null, count++ === 0 ? row : '\n' + row )
}
return transform;
}
function toJSON () {
var count = 0;
var transform = new stream.Transform({
writableObjectMode: true
})
transform._transform = function ( row, encoding, done ) {
row = JSON.stringify( row );
done( null, count++ === 0 ? row : '\n' + row );
}
return transform
}
function toAVRO ( columns, test ) {
var compression = 'null';
if ( test.compression ) {
// ugly little trick - compression is handled by AVRO itself
// instead of by gzip. Setting the compression to the string 'true'
// will cause the rest of the code to not handle it as a compressed
// file - but will keep the string visible in the final output.
test.compression = 'true';
compression = 'deflate';
}
var writer = new avro.DataFile.Writer( {
name: 'data',
type: 'record',
fields: columns.map( function ( column ) {
return { name: column.toString(), type: 'string' }
})
}, compression );
writer.syncInterval = 16 * 10000000;
var pause = false;
var writeBlock = writer._writeBlock;
writer._writeBlock = function () {
var rv = writeBlock.apply( this, arguments );
pause = true;
return rv;
}
var resetBlocks = writer._resetBlocks;
writer._resetBlocks = function () {
var rv = resetBlocks.apply( this, arguments );
setTimeout( function () {
pause = false;
this.emit( 'drain' );
}.bind( this ), 1 )
return rv;
}
var write = writer.write;
writer.write = function () {
return write.apply( this, arguments ) && !pause;
}
return writer;
}
function combine( variations, keys, idx ) {
idx = idx || 0;
keys = keys || Object.keys( variations );
var name = keys[ idx ];
var values = variations[ name ];
var items = [];
values.forEach( function ( value ) {
if ( idx === keys.length - 1 ) {
var item = {};
item[ name ] = value;
items.push( item );
} else {
combine( variations, keys, idx + 1 ).forEach( function ( item ) {
item[ name ] = value;
items.push( item );
})
}
})
return items;
}
function timer() {
var t;
return {
reset: function () {
t = new Date().getTime();
},
elapsed: function () {
return ( new Date().getTime() - t ) / 1000;
}
}
}
// generate the data
function createData ( test ) {
return request.getAsync( DATA_JSON_URL )
.then( function ( res ) {
var body = res.body;
body = JSON.parse( body );
var cols = body.meta.view.columns;
var rows = body.data;
cols = createArray( test.cols ).map( function ( c, i ) {
var name = cols[ i % cols.length ].fieldName.replace( ':', '' );
return name + Math.floor( i / cols.length );
})
var sizes = createArray( test.cols ).map( function () {
return 0;
});
var data = body.data.map( function ( row ) {
return cols.reduce( function ( obj, col, i ) {
obj[ col ] = String( row[ i % row.length ] );
sizes[ i ] = Math.max( sizes[ i ], obj[ col ].length );
return obj;
}, {} )
});
data.columnSize = sizes;
return data;
})
}
function createArray ( n ) {
var arr = [];
var i = 0;
for ( var i = 0 ; i < n ; i += 1 ) {
arr.push( i );
}
return arr;
}
// start the tests
var tests = combine( VARIATIONS );
Promise.resolve( tests )
.each( function ( test, i, total ) {
test.test = ( i + 1 ) + '/' + total;
debug( 'Starting', test );
return runTest( test );
})
.then( function ( tests ) {
var columns = Object.keys( tests[ 0 ] ).reduce( function ( cols, key ) {
if ( cols.indexOf( key ) === -1 ) {
cols.push( key )
}
return cols;
}, [ 'test' ] );
var table = new Table({ head: columns });
tests.forEach( function ( test ) {
var row = columns.map( function ( col ) {
return test[ col ];
})
table.push( row );
})
console.log( table.toString() );
console.log( 'All tests done.' );
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment