Skip to content

Instantly share code, notes, and snippets.

@maruf89
Created June 26, 2014 18:24
Show Gist options
  • Save maruf89/4865fd0e78ce34e28e13 to your computer and use it in GitHub Desktop.
Save maruf89/4865fd0e78ce34e28e13 to your computer and use it in GitHub Desktop.
Assembles upload chunks into a single file. Originally built for Flow.js using Nodejs
# Usage: Create a new combiner instance for each new object that needs assembling
# And call #add to add each new chunk
#
# combiner = combiners[identifier] or new Combiner(
# numberOfChunks: numberOfChunks
# filename: filename
# identifier: identifier
# directory: @temporaryFolder
# callback: ->
# # destroy reference as it's no longer needed
# combiners[identifier] = null
# callback.apply(null, arguments)
# )
# combiners[identifier] = combiner
#
# if combiner.add(files['file'].path, chunkNumber)
# res.send(200)
###*
* Assembles chunks into a final file
* @type {[type]}
###
exports.Combiner = class Combiner extends EventEmitter
chunks = null
self = null
requiredParams = [
'numberOfChunks'
'identifier'
'directory'
]
downloadExtension = '.cfdownload'
###*
* Accepts options
*
* @param {Object} data has 3 required properties listed below
* @options data {Integer} numberOfChunks total number of chunks
* @options data {String} identifier file identifier
# @options data {String} directory path of the temp files
# @options data {Function} callback optional callback when file is combined
###
constructor: (data = {}, _undefined) ->
# Iterate over the required params and throw error if any are missing
for param in requiredParams
throw new VError('Missing `numberOfChunks` parameter') if data[param] is _undefined
self = @
_.each data, (val, key) ->
self[key] = val
return
chunks = new Array(data.numberOfChunks)
# Determine the file extension from the passed in filename
@fileExtension = path.extname(@filename)
@idle = true
@downloadName = "#{@directory}/#{@identifier}#{downloadExtension}"
@filename = "#{@directory}/#{@identifier}#{@fileExtension}"
@currentIndex = 0
# If file exists from previously, remove it
fs.exists @filename, (exists) =>
@idle = not exists
if exists
fs.unlink @filename, =>
@emit('resume')
# Bind the internal callbacks
self.on('idle', @onIdle)
self.on('written', @onWritten)
self.on('resume', @onResume)
self.on('error', @onError)
# setup callback if one is passed in
if @callback
self.on('done', @callback)
###*
* File has been uploaded and ready to append
*
* @param {String} file path of the temp file
* @param {Integer} index chunk number
* @return {Boolean} whether the chunk is NOT the last chunk
###
add: (filePath, index) ->
# index is not 0 based, let's make it so
arrIndex = index - 1
# Only write if we're idle, and this chunk is ready
if @idle and arrIndex is @currentIndex
@write(filePath)
else
chunks[arrIndex] = filePath
return index isnt @numberOfChunks
chunkReady: (index = @currentIndex) ->
return !!chunks[index]
###*
* Proceeds to write/append the next file chunk to the final file
*
* @param {String} file
* @return {[type]} [description]
###
write: (filePath, index = @currentIndex) ->
# exit early if no file sent
if !filePath
@emit('resume')
return false
writeStream = @getStream()
@idle = false
self = this
isEnd = index is @numberOfChunks
readStream = fs.createReadStream filePath,
flags: "r"
encoding: null
fd: null
mode: '0666'
# When done remove the file and emit it's done writing
readStream.on 'end', ->
self.emit('written', filePath)
readStream.on 'error', (err) ->
self.emit('error', err)
# append the guy
readStream.pipe(writeStream, {end: isEnd})
###*
* Returns a new write stream if one doesn't already exist
*
* @param {String} file the final file path
* @return {Stream} write stream for the final file
###
getStream: ->
return @writeStream if @writeStream
return @writeStream = fs.createWriteStream @downloadName,
flags: 'w'
encoding: null
mode: '0666'
###*
* Called after onWritten, and waiting for chunks to finish
###
onIdle: ->
@idle = true
###*
* Called after file has been appended
* Remove the temp file and continue
*
* @callback
* @param {String} filePath the path of the file that was just read from
###
onWritten: (filePath) ->
fs.unlink filePath, =>
chunks[@currentIndex++] = null
if @currentIndex is @numberOfChunks
return @onDone()
@emit('resume')
###*
* Called when done with one action, and ready to move on
* @return {[type]} [description]
###
onResume: ->
if @chunkReady()
@write(chunks[@currentIndex])
else
@emit('idle')
###*
* If any errors are thrown, clean up and throw error
*
* @param {Error} err
###
onError: (err) ->
message = "error uploading #{@filename}"
error = new VError(err, message)
console.log(error)
emitErr = ->
self.emit 'done',
filename: null
response: 500
success: false
message: message
error: error
# Remove temp download file if exists
fs.exists @downloadName, (exists) ->
if exists
fs.unlink(self.downloadName, emitErr)
else emitErr()
###*
* Called after the last chunk has been appended
###
onDone: ->
fs.rename @downloadName, @filename, =>
return @emit('done',
filename: @filename
response: 200
success: true
message: 'successfully uploaded!'
error: null
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment