Created
April 6, 2016 14:00
-
-
Save avinoamr/2531e25ee14ab30f8686e9a3ed4e305e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const AWS_KEY = ''; | |
const AWS_SECRET = ''; | |
const S3_BUCKET = ''; | |
const DATA_JSON_URL = 'https://data.cityofnewyork.us/api/views/jb7j-dtam/rows.json?accessType=DOWNLOAD'; | |
const REDSHIFT_CLUSTERS = { | |
'2x.dc1.large': {} // host, user, password, database, port | |
'2x.dc1.8xlarge': {}, | |
'2x.ds2.xlarge': {}, | |
'2x.ds2.8xlarge': {}, | |
} | |
const VARIATIONS = { | |
format: [ 'avro', 'csv', 'json', 'fixedwidth' ], | |
compression: [ true, false ], | |
files: [ 1, 10 ], | |
cols: [ 5, 20 ], | |
rows: [ 1000000, 10000000 ], | |
cluster: [ '2x.dc1.large', '2x.dc1.8xlarge', '2x.ds2.xlarge', '2x.ds2.8xlarge' ], | |
} | |
// install build-essential & git | |
var pg = require( 'pg' ) | |
var fs = require( 'fs' ); | |
var path = require( 'path' ); | |
var util = require( 'util' ); | |
var zlib = require( 'zlib' ); | |
var aws = require( 'aws-sdk' ); | |
var bytes = require( 'bytes' ); | |
var Table = require('cli-table'); | |
var extend = require( 'extend' ); | |
var stream = require( 'stream' ); | |
var request = require( 'request' ); | |
var Promise = require( 'bluebird' ); | |
var avro = require( 'node-avro-io' ); // npm install jamesbrucepower/node-avro-io | |
var debug = require( 'debug' )( 'redshift-bench' ); | |
var s3 = new aws.S3({ | |
accessKeyId: AWS_KEY, | |
secretAccessKey: AWS_SECRET, | |
signatureVersion: "v4", | |
}); | |
s3 = Promise.promisifyAll( s3 ); | |
pg = Promise.promisifyAll( pg ); | |
request = Promise.promisifyAll( request ); | |
function runTest ( test ) { | |
test.serialize_time = ''; | |
test.compress_time = ''; | |
test.upload_time = ''; | |
test.copy_time = ''; | |
test.datasize = 0; | |
return Promise.bind({ | |
test: test | |
}) | |
.then( function () { | |
return createData( test ) | |
}) | |
.then( function ( data ) { | |
this.data = data; | |
this.columns = Object.keys( data[ 0 ] ).map( function ( columnName, i ) { | |
columnName = new String( columnName ); | |
columnName.size = data.columnSize[ i ] + 1; | |
return columnName; | |
}) | |
return pg.connectAsync( REDSHIFT_CLUSTERS[ test.cluster ] ) | |
}) | |
.then( function ( client, done ) { | |
debug( 'Connected to %s', test.cluster ); | |
this.done = client.end.bind( client ); | |
this.query = function ( sql ) { | |
debug( 'SQL %s', sql ); | |
return client.queryAsync.apply( client, arguments ); | |
} | |
}) | |
.then( function () { | |
return this.query( 'DROP TABLE IF EXISTS bench' ); | |
}) | |
.then( function () { | |
return this.query([ | |
'CREATE TABLE bench (', | |
this.columns.map( function ( name ) { | |
return '"' + name + '" varchar'; | |
}).join( ',' ), | |
')' | |
].join( ' ' ) ) | |
}) | |
.then( serialize ) | |
.then( compress ) | |
.each( function ( file ) { | |
test.datasize += fs.lstatSync( file ).size | |
}) | |
.tap( function () { | |
test.datasize = bytes( test.datasize ); | |
}) | |
.then( upload ) | |
.then( copy ) | |
.then( function () { | |
console.log( '' ); | |
debug( 'Test done', test ); | |
this.done(); | |
}) | |
} | |
function serialize () { | |
var test = this.test; | |
var columns = this.columns; | |
var data = this.data; | |
var rowsPerFile = Math.floor( test.rows / test.files ); | |
var t = timer(); | |
return Promise.resolve( createArray( test.files ) ) | |
.tap( function () { | |
t.reset(); | |
}) | |
.map( function ( i ) { | |
var name = [ 'data', i, test.format, 'raw' ].join( '.' ); | |
var path = '/tmp/' + name; | |
debug( 'Serializing %s rows to %s', rowsPerFile, path ); | |
return new Promise( function ( resolve, reject ) { | |
new DataStream( rowsPerFile, columns, data ) | |
.once( 'error', reject ) | |
.pipe( DataStream.toFormat( test.format, columns, test ) ) | |
.once( 'error', reject ) | |
.pipe( fs.createWriteStream( path ) ) | |
.once( 'error', reject ) | |
.once( 'finish', function () { | |
process.nextTick( resolve.bind( null, path ) ) | |
}) | |
}) | |
}) | |
.tap( function () { | |
test.serialize_time = t.elapsed(); | |
}) | |
} | |
function compress ( files ) { | |
var test = this.test; | |
if ( test.compression !== true ) { | |
test.compress_time = 0; | |
return files; | |
} | |
var t = timer(); | |
return Promise.resolve( files ) | |
.tap( function () { | |
t.reset(); | |
}) | |
.map( function ( source ) { | |
var target = source.split( '.' ) | |
.slice( 0, -1 ) | |
.concat( 'gz' ) | |
.join( '.' ); | |
debug( 'Compressing %s to %s', source, target ); | |
return new Promise( function ( resolve, reject ) { | |
fs.createReadStream( source ) | |
.once( 'error', reject ) | |
.pipe( zlib.createGzip() ) | |
.once( 'error', reject ) | |
.pipe( fs.createWriteStream( target ) ) | |
.once( 'error', reject ) | |
.once( 'finish', function () { | |
process.nextTick( resolve.bind( null, target ) ); | |
}) | |
}) | |
}) | |
.tap( function () { | |
test.compress_time = t.elapsed(); | |
}) | |
} | |
function upload ( files ) { | |
var test = this.test; | |
var t = timer(); | |
return Promise.resolve( files ) | |
.tap( function () { | |
t.reset(); | |
}) | |
.map( function ( file ) { | |
var key = path.basename( file ); | |
debug( 'Uploading %s to %s', file, key ); | |
return s3.uploadAsync({ | |
Bucket: S3_BUCKET, | |
Key: key, | |
Body: fs.createReadStream( file ) | |
}) | |
.return( key ) | |
}) | |
.then( function ( keys ) { | |
debug( 'Upload done.' ); | |
if ( keys.length === 1 ) { | |
return keys[ 0 ] // don't create manifest on a single file | |
} | |
var key = 'data.manifest'; | |
var entries = keys.map( function ( key ) { | |
return { | |
url: "s3://" + S3_BUCKET + "/" + key, | |
mandatory: true | |
} | |
}) | |
debug( 'Uploading manifest %s', key ); | |
return s3.uploadAsync({ | |
Bucket: S3_BUCKET, | |
Key: key, | |
Body: JSON.stringify({ | |
entries: entries | |
}) | |
}) | |
.return( key ) | |
}) | |
.tap( function () { | |
test.upload_time = t.elapsed(); | |
}) | |
} | |
function copy ( key ) { | |
debug( 'Copying %s', key ); | |
var test = this.test; | |
var creds = [ | |
'aws_access_key_id=' + s3.config.accessKeyId, | |
'aws_secret_access_key=' + s3.config.secretAccessKey | |
].join( ";" ); | |
var isManifest = ( path.extname( key ) === '.manifest' ); | |
var format = test.format.toUpperCase(); | |
if ( format === 'FIXEDWIDTH' ) { | |
format += ' \'' + this.columns.map( function ( col ) { | |
return col + ':' + col.size | |
}).join( ',' ) + '\'' | |
} else if ( format !== 'CSV' ) { | |
format += ' AS \'auto\''; | |
} | |
var t = timer(); | |
return Promise.bind( this ) | |
.tap( function () { | |
t.reset(); | |
}) | |
.then( function () { | |
return this.query([ | |
'COPY bench FROM', '\'s3://' + S3_BUCKET + '/' + key + '\'', | |
'WITH CREDENTIALS', "'" + creds + "'", | |
'FORMAT AS', format, | |
isManifest ? 'MANIFEST' : '', | |
test.compression === true ? 'GZIP' : '', | |
'STATUPDATE OFF', // disable implicit analyze on temp tables | |
'COMPUPDATE OFF', // disable auto compression on temp tables | |
].join( ' ' ) ) | |
}) | |
.tap( function () { | |
test.copy_time = t.elapsed(); | |
}) | |
} | |
util.inherits( DataStream, stream.Readable ); | |
function DataStream ( rows, columns, data ) { | |
stream.Readable.call( this, { objectMode: true }); | |
this.rows = rows; | |
this.columns = columns; | |
this.data = data; | |
this.count = 0; | |
} | |
DataStream.prototype._read = function () { | |
if ( this.count >= this.rows ) { | |
this.push( null ); | |
return | |
} | |
if ( this.count % 10000 === 0 ) { | |
debug( 'DataStream emitted %s / %s', this.count, this.rows ); | |
} | |
var row = this.data[ this.count % this.data.length ]; | |
this.count += 1; | |
this.push( row ); | |
} | |
DataStream.toFormat = function ( format, columns, test ) { | |
return ({ | |
csv: toCSV, | |
json: toJSON, | |
avro: toAVRO, | |
fixedwidth: toFixedWidth | |
})[ format ]( columns, test ) | |
} | |
function toFixedWidth ( columns ) { | |
var count = 0; | |
var transform = new stream.Transform({ | |
writableObjectMode: true | |
}) | |
transform._transform = function ( row, encoding, done ) { | |
row = columns.map( function ( column ) { | |
var v = row[ column ]; | |
if ( typeof v === 'string' ) { | |
v = v.replace( /[\n\'\"\t\r\{\}\,]/g, '' ) || ' '; | |
} | |
v = v.toString(); | |
while ( v.length < column.size ) { | |
v += ' '; | |
} | |
return v; | |
}).join( '' ); | |
done( null, count++ === 0 ? row : '\n' + row ) | |
} | |
return transform; | |
} | |
function toCSV ( columns ) { | |
var count = 0; | |
var transform = new stream.Transform({ | |
writableObjectMode: true | |
}) | |
transform._transform = function ( row, encoding, done ) { | |
row = columns.map( function ( column ) { | |
var v = row[ column ]; | |
if ( typeof v === 'string' ) { | |
v = v.replace( /[\n\'\"\t\r\{\}\,]/g, '' ) || ' '; | |
} | |
return v; | |
}).join( ',' ); | |
done( null, count++ === 0 ? row : '\n' + row ) | |
} | |
return transform; | |
} | |
function toJSON () { | |
var count = 0; | |
var transform = new stream.Transform({ | |
writableObjectMode: true | |
}) | |
transform._transform = function ( row, encoding, done ) { | |
row = JSON.stringify( row ); | |
done( null, count++ === 0 ? row : '\n' + row ); | |
} | |
return transform | |
} | |
function toAVRO ( columns, test ) { | |
var compression = 'null'; | |
if ( test.compression ) { | |
// ugly little trick - compression is handled by AVRO itself | |
// instead of by gzip. Setting the compression to the string 'true' | |
// will cause the rest of the code to not handle it as a compressed | |
// file - but will keep the string visible in the final output. | |
test.compression = 'true'; | |
compression = 'deflate'; | |
} | |
var writer = new avro.DataFile.Writer( { | |
name: 'data', | |
type: 'record', | |
fields: columns.map( function ( column ) { | |
return { name: column.toString(), type: 'string' } | |
}) | |
}, compression ); | |
writer.syncInterval = 16 * 10000000; | |
var pause = false; | |
var writeBlock = writer._writeBlock; | |
writer._writeBlock = function () { | |
var rv = writeBlock.apply( this, arguments ); | |
pause = true; | |
return rv; | |
} | |
var resetBlocks = writer._resetBlocks; | |
writer._resetBlocks = function () { | |
var rv = resetBlocks.apply( this, arguments ); | |
setTimeout( function () { | |
pause = false; | |
this.emit( 'drain' ); | |
}.bind( this ), 1 ) | |
return rv; | |
} | |
var write = writer.write; | |
writer.write = function () { | |
return write.apply( this, arguments ) && !pause; | |
} | |
return writer; | |
} | |
function combine( variations, keys, idx ) { | |
idx = idx || 0; | |
keys = keys || Object.keys( variations ); | |
var name = keys[ idx ]; | |
var values = variations[ name ]; | |
var items = []; | |
values.forEach( function ( value ) { | |
if ( idx === keys.length - 1 ) { | |
var item = {}; | |
item[ name ] = value; | |
items.push( item ); | |
} else { | |
combine( variations, keys, idx + 1 ).forEach( function ( item ) { | |
item[ name ] = value; | |
items.push( item ); | |
}) | |
} | |
}) | |
return items; | |
} | |
function timer() { | |
var t; | |
return { | |
reset: function () { | |
t = new Date().getTime(); | |
}, | |
elapsed: function () { | |
return ( new Date().getTime() - t ) / 1000; | |
} | |
} | |
} | |
// generate the data | |
function createData ( test ) { | |
return request.getAsync( DATA_JSON_URL ) | |
.then( function ( res ) { | |
var body = res.body; | |
body = JSON.parse( body ); | |
var cols = body.meta.view.columns; | |
var rows = body.data; | |
cols = createArray( test.cols ).map( function ( c, i ) { | |
var name = cols[ i % cols.length ].fieldName.replace( ':', '' ); | |
return name + Math.floor( i / cols.length ); | |
}) | |
var sizes = createArray( test.cols ).map( function () { | |
return 0; | |
}); | |
var data = body.data.map( function ( row ) { | |
return cols.reduce( function ( obj, col, i ) { | |
obj[ col ] = String( row[ i % row.length ] ); | |
sizes[ i ] = Math.max( sizes[ i ], obj[ col ].length ); | |
return obj; | |
}, {} ) | |
}); | |
data.columnSize = sizes; | |
return data; | |
}) | |
} | |
function createArray ( n ) { | |
var arr = []; | |
var i = 0; | |
for ( var i = 0 ; i < n ; i += 1 ) { | |
arr.push( i ); | |
} | |
return arr; | |
} | |
// start the tests | |
var tests = combine( VARIATIONS ); | |
Promise.resolve( tests ) | |
.each( function ( test, i, total ) { | |
test.test = ( i + 1 ) + '/' + total; | |
debug( 'Starting', test ); | |
return runTest( test ); | |
}) | |
.then( function ( tests ) { | |
var columns = Object.keys( tests[ 0 ] ).reduce( function ( cols, key ) { | |
if ( cols.indexOf( key ) === -1 ) { | |
cols.push( key ) | |
} | |
return cols; | |
}, [ 'test' ] ); | |
var table = new Table({ head: columns }); | |
tests.forEach( function ( test ) { | |
var row = columns.map( function ( col ) { | |
return test[ col ]; | |
}) | |
table.push( row ); | |
}) | |
console.log( table.toString() ); | |
console.log( 'All tests done.' ); | |
}) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment