Skip to content

Instantly share code, notes, and snippets.

@jhermsmeier
Last active May 10, 2017 20:33
Embed
What would you like to do?
Copy a source to target in a very fast & memory efficient way

FastPipe

After many hours of trial and error, self-optimizing streams and such, I have finally found the fastest (?) and most efficient (?) way to copy an image from a given source to a given target.

There where several constraints that had to be as optimal as possible:

  • Speed
  • Memory consumption
  • CPU utilization

Of course this comes with the classical tradeoff-problem by default.

Previous Approaches

Across these previous approaches to the problem, figures in the following ranges have been observed:

Memory: up to ~600MB
CPU: 20-70%

Node Core Streams

Depending on the highWaterMarks, either very CPU or memory bound. High highWaterMarks result in low CPU usage, but an extreme amount of Buffer allocations and larger sizes of stream-internal buffer pools, which in turn leads to the GC going rampage. Low highWaterMarks result in very high CPU usage, and lower memory consumption (also depending on how many buffers can be sliced off of the internal pre-allocated buffer pool), though still very volatile memory load due to the GC.

Hand-rolled (core based) Streams

Several different hand-rolled streams were written and tested:

Self-optimizing block size & highWaterMark:

Performed quite badly due to being based on the core streams (same issues apply here), and not making up the time it takes to find a good block size.

Custom write coupled to readable stream:

The idea was to .read(blockSize) which turned out very inefficient, as this seemed to result in a high quantity of smaller reads, and internal buffer growth. Also, surprisingly slow.

Custom readable & writable with fixed block sizes:

Made things a lot faster, but suffered from immense memory consumption and GC issues. Also tried a pre-allocated SlowBuffer approach with this one, which failed horribly due to the nature of core streams buffering internally even when hitting the highWaterMark prematurely; leading to buffer re-use and thus garbage data being written.

Current Approach

Simple functions all the way down; no this. The reason behind that is to avoid having to bind to contexts etc. in callbacks, which would just make things unnecessarily complicated here.

The basic setup is to open & stat the source & target, determine a good block size (multiplied with a factor that turned out to be the sweet spot experimentally), then proceed to sequentially read and write buffers of the given block size. This allows for the use of a one-off allocation of a SlowBuffer and GC-free re-use of that.

All of that brings the memory consumption down to the lowest level possible, and also keeps the CPU utilization to a minimum, as most time is spent in the lower levels (reading & writing).

With the following v8 options also passed to the process, very low resource consumption has been observed:

node --max-executable-size=8 --max-old-space-size=16 --max-semi-space-size=1

While flashing a 4.3 GB image to an SD-Card, the following value ranges have been observed:

Memory: 16-22MB
CPU: 2-4%
Throughput: 23.5 MB/s (it's a slow one)

Note that the fluctuations in memory consumption are likely mostly due to peripheral code, i.e. logging, speed measurements and other modules.

While copying said image to an external HDD (magnetic) connected via USB 2.0:

Memory: 16-22MB
CPU: 12-18%
Throughput: 108 MB/s

To Do:

  • Better benchmarks with more devices
  • Emit each block to support alter/read-before-write (could also be easily done with support for streaming sources)
  • Support streams as sources
  • Improve API
  • Get rid of async

Usage:

var fastPipe = require( './fast-pipe' )
var source = {
  path: '/path/to/source.img',
  flags: 'r',
}
// NOTE: Make sure you use the raw disk descriptor,
// otherwise it's going to be a lot slower
var target = {
  path: '/dev/rdisk4',
  flags: 'w',
}
var emitter = fastPipe( source, target, {
  // Optional, defaults to block size determined by `fs.stat()`
  blockSize: 512 * 1024,
})
emitter.once( 'error', () => {})
emitter.once( 'end', () => {})
var fs = require( 'fs' )
var async = require( 'async' )
var debug = require( 'debug' )( 'simple' )
var Emitter = require( 'events' ).EventEmitter
function open( file, callback ) {
fs.open( file.path, file.flags, function( error, fd ) {
if( error ) return callback( error )
file.fd = fd
stat( file, callback )
})
}
function stat( file, callback ) {
fs.stat( file.path, function( error, stats ) {
if( error ) return callback( error )
file.stats = stats
file.bytesLeft = stats.size || file.size
callback( error, file )
})
}
function read( file, blockSize, buffer, callback ) {
// Make sure to not read OOB at the end
var size = Math.min( file.bytesLeft, blockSize )
if( size <= 0 ) return this.close();
// If we're at the end and our slowBuffer is too large,
// allocate an unsafe buffer of determined size
buffer = size === blockSize ?
buffer : Buffer.allocUnsafe( size )
fs.read( file.fd, buffer, 0, size, null, callback )
}
function write( file, buffer, callback ) {
fs.write( file.fd, buffer, 0, buffer.length, null, callback )
}
/**
* Copy a source to a target in a very fast & memory efficient way
* @param {Object} source
* @param {String} source.path
* @param {String} source.flags
* @param {Number} [source.size]
* @param {Object} target
* @param {String} target.path
* @param {String} target.flags
* @param {Number} [target.size]
* @param {Object} options
* @param {Number} options.blockSize
*/
function pipe( source, target, options={} ) {
var emitter = new Emitter()
emitter.bytesRead = 0
emitter.bytesWritten = 0
// Open and stat source & target
async.series([
function( next ) { open( source, next ) },
function( next ) { open( target, next ) },
], function( error ) {
if( error != null )
return emitter.emit( 'error', error );
debug( 'source', source )
debug( 'target', target )
// Determine a good block size for writes;
// For some reason `4` seems to be a good factor for the determined block size.
var blockSize = target.stats.blksize ?
target.stats.blksize * 4 : ( 256 * 1024 )
// Use the specified block size, if set
blockSize = options.blockSize || blockSize
// Allocate our slowBuffer, which is allocated once with the block size,
// and then re-used for all reads & writes to avoid any allocation & GC cost.
// (i.e. when using node core streams, memory consumption will skyrocket up to several 100MBs
// due to many buffer allocations larger than the buffer pool size)
var slowBuffer = Buffer.allocUnsafeSlow( blockSize )
debug( 'blocksize', blockSize )
async.whilst(
// Continue until we've hit the end of the source
function test() { return source.bytesLeft > 0 },
// Read & then immediately write each block in sequence
// while keeping track of bytes read & written as well as how many are still left to write
// (multiple write calls are unsafe anyways, and buffering also makes no sense,
// as we're trying to evade the allocation / GC problem)
function iteratee( next ) {
read( source, blockSize, slowBuffer, function( error, bytesRead, buffer ) {
if( error ) return next( error );
emitter.bytesRead = emitter.bytesRead + bytesRead
write( target, buffer, function( error, bytesWritten ) {
if( error ) return next( error );
source.bytesLeft = source.bytesLeft - bytesWritten
emitter.bytesWritten = emitter.bytesWritten + bytesWritten
next()
})
})
},
// Either we hit an error along the way,
// or we're done here...
function done( error ) {
if( error != null )
return emitter.emit( 'error', error );
emitter.emit( 'end' )
}
)
})
return emitter
}
module.exports = pipe
var childProcess = require( 'child_process' )
var path = require( 'path' )
var debug = require( 'debug' )( 'host' )
var workerPath = path.join( __dirname, 'worker.js' )
var forkOptions = {}
var tasks = [{
source: '/Users/Jonas/Downloads/2017-01-11-raspbian-jessie.img',
target: '/dev/rdisk4',
},{
source: '/Users/Jonas/Downloads/2017-01-11-raspbian-jessie.img',
target: '/dev/rdisk3',
}]
tasks.forEach( function( task, i ) {
task.process = childProcess.fork( './worker', [ task.source, task.target ], {
execArgv: [ '--max-executable-size=8', '--max-old-space-size=16', '--max-semi-space-size=1' ],
})
task.process.disconnect()
task.process.once( 'exit', function( code, signal ) {
debug( 'task', i, 'exited with code', code, 'and signal', signal )
})
})
process.on( 'SIGINT', function() {
tasks.forEach( function( task ) {
if( task.process ) { task.process.kill( 'SIGINT' ) }
})
})
{
"name": "fast-pipe",
"description": "Copy a source to target in a very fast & memory efficient way",
"license": "MIT",
"dependencies": {
"async": "~2.1.4",
"debug": "~2.6.0",
"speedometer": "~1.0.0"
}
}
const KB = 1024
const MB = 1024 * KB
const GB = 1024 * MB
const TB = 1024 * GB
function size( bytes ) {
var unit = 'B'
var factor = 1
if( bytes > TB ) { unit = 'TB'; factor = TB }
else if( bytes > GB ) { unit = 'GB'; factor = GB }
else if( bytes > MB ) { unit = 'MB'; factor = MB }
else if( bytes > KB ) { unit = 'KB'; factor = KB }
return ( bytes / factor ).toFixed(1) + ' ' + unit
}
module.exports = size
var fs = require( 'fs' )
var path = require( 'path' )
var speedometer = require( 'speedometer' )
var size = require( './size' )
var fastPipe = require( './fast-pipe' )
var argv = process.argv.slice( 2 )
var sourcePath = argv.shift()
var targetPath = argv.shift()
var debug = require( 'debug' )( path.basename( path.dirname( targetPath ) ) )
debug( 'source:', sourcePath )
debug( 'target:', targetPath )
if( !sourcePath || !targetPath ) {
throw new Error( 'Missing source or destination' )
}
var source = {
path: sourcePath,
flags: 'r',
}
var target = {
path: targetPath,
flags: 'w',
}
var options = {
// blockSize: 256 * 1024 * 1024,
}
var bytesWritten = 0
var time = process.hrtime()
var speed = speedometer(2)
var emitter = fastPipe( source, target, options )
emitter.once( 'end', () => {
time = process.hrtime( time )
clearInterval( clock )
console.log( 'write:', time[0] , 'seconds' )
})
clock = setInterval( function() {
debug( size( speed( emitter.bytesWritten - bytesWritten ) ) + '/s' )
bytesWritten = emitter.bytesWritten
}, 500 )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment