Skip to content

Instantly share code, notes, and snippets.

@ptgamr ptgamr/flow-uploader.js
Last active Sep 17, 2019

Embed
What would you like to do?
Flowjs, ResumableJs NodeJS backend
'use strict';
const fs = Promise.promisifyAll(require('fs'));
const path = require('path');
const crypto = require('crypto');
const CronJob = require('cron').CronJob;
module.exports = class FlowUploader {
constructor(tempDir, uploadDir, maxFileSize, fileParameterName) {
this.tempDir = tempDir || './tmp';
this.uploadDir = uploadDir || './uploads';
this.maxFileSize = maxFileSize;
this.fileParameterName = fileParameterName || 'file';
try {
fs.mkdirSync(this.tempDir);
} catch (e) {}
//run clean up job five minutes after midnight, every day
new CronJob('5 0 * * *', () => { this._cleanUnfinishedChunks(); }, null, true, 'Europe/Zurich');
}
chunkExists(req) {
let chunkNumber = req.query.flowChunkNumber,
chunkSize = req.query.flowChunkSize,
totalSize = req.query.flowTotalSize,
identifier = req.query.flowIdentifier,
fileName = req.query.flowFilename;
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName);
if (validation !== 'VALID') {
return Promise.reject(validation);
}
let chunkFilename = this._getChunkFilename(chunkNumber, identifier);
return fs.statAsync(chunkFilename);
}
saveChunk(req) {
let fields = req.body,
files = req.files;
let chunkNumber = Number(fields.flowChunkNumber),
chunkSize = Number(fields.flowChunkSize),
totalSize = Number(fields.flowTotalSize),
identifier = fields.flowIdentifier,
fileName = fields.flowFilename;
if (!files[this.fileParameterName] || !files[this.fileParameterName].size) {
return Promise.reject('INVALID_FLOW_REQUEST');
}
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, files[this.fileParameterName].size);
if (validation !== 'VALID') {
return Promise.reject(validation);
}
let chunkFilename = this._getChunkFilename(chunkNumber, identifier);
return fs.renameAsync(files[this.fileParameterName].path, chunkFilename)
.then(() => {
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize);
if (chunkNumber !== numberOfChunks) {
return 'PARTLY_DONE';
}
let chunkFileNames = [];
for (let i = 1; i <= numberOfChunks; i++) {
chunkFileNames.push(this._getChunkFilename(i, identifier));
}
return Promise.map(
chunkFileNames,
chunkFileName => fs.statAsync(chunkFileName),
{concurency: 2}
).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName))
.then(filename => filename, () => 'ERROR_VERIFY_CHUNK');
});
}
download(fileName) {
let downloadPath = this._getDownloadPath(fileName);
return fs.statAsync(downloadPath).then(() => {
return fs.createReadStream(downloadPath);
});
}
_isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, fileSize) {
identifier = this._cleanIdentifier(identifier);
if (!chunkNumber || !chunkSize || !totalSize || !identifier || !fileName) {
return 'INVALID_FLOW_REQUEST';
}
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize);
if (chunkNumber > numberOfChunks) {
return 'INVALID_CHUNK_NUMBER';
}
if (this.maxFileSize && totalSize > this.maxFileSize) {
return 'INVALID_FILE_SIZE';
}
if (typeof fileSize !== 'undefined') {
if (chunkNumber < numberOfChunks && fileSize !== chunkSize) {
console.log('>>>>.', typeof fileSize, typeof chunkSize);
return 'INVALID_FILESIZE_CHUNKSIZE_MISMATCH';
}
if (numberOfChunks > 1 && chunkNumber === numberOfChunks && fileSize !== ((totalSize % chunkSize) + parseInt(chunkSize))) {
return 'INVALID_LAST_CHUNK';
}
if (numberOfChunks === 1 && fileSize !== totalSize) {
return 'INVALID_SINGLE_CHUNK';
}
}
return 'VALID';
}
_getNumberOfChunks(totalSize, chunkSize) {
return Math.max(Math.floor(totalSize/chunkSize), 1);
}
_cleanIdentifier(identifier) {
return identifier.replace(/[^0-9A-Za-z_-]/g, '');
}
_getChunkFilename(chunkNumber, identifier) {
identifier = this._cleanIdentifier(identifier);
let hash = crypto.createHash('sha1').update(identifier).digest('hex');
return path.resolve(this.tempDir, `./${identifier}-${hash}.${chunkNumber}`);
}
_getDownloadPath(fileName) {
return path.resolve(this.uploadDir, `./${fileName}`);
}
_writeToUploadDir(numberOfChunks, identifier, fileName) {
let hash = crypto.createHash('sha1').update(identifier).digest('hex');
let writeDir = path.resolve(this.uploadDir, `./${identifier}-${hash}${path.extname(fileName)}`);
let writableStream = fs.createWriteStream(writeDir);
let chunkFileNames = [];
for (let i = 1; i <= numberOfChunks; i++) {
chunkFileNames.push(this._getChunkFilename(i, identifier));
}
return Promise.each(
chunkFileNames,
chunkFileName => {
return new Promise(resolve => {
let sourceStream = fs.createReadStream(chunkFileName);
sourceStream.pipe(writableStream, {
end: false
});
sourceStream.on('end', function() {
fs.unlink(chunkFileName);
resolve();
});
});
}
).then(() => {
writableStream.end();
return path.basename(writeDir);
});
}
_cleanUnfinishedChunks() {
let now = new Date().getTime();
let oneDay = 24 * 60 * 60 * 1000;
fs.readdirAsync(this.tempDir)
.map(fileName => {
let filePath = path.resolve(this.tempDir, `./${fileName}`);
return fs.statAsync(filePath).then(stat => {
return {
filePath: filePath,
stat: stat
};
});
}, {concurency: 2})
.filter(fileStat => {
let modifiedTime = fileStat.stat.ctime.getTime();
return (now - modifiedTime) >= oneDay;
})
.each(fileStat => fs.unlinkAsync(fileStat.filePath));
}
};
// Handle status checks on chunks through Flow.js
router.get('/flow', function(req, res){
return uploader
.chunkExists(req)
.then(
() => res.status(200).send(),
() => res.status(204).send()
);
});
// Handle uploads through Flow.js
router.post('/flow', multipartMiddleware, function(req, res) {
return uploader
.saveChunk(req)
.then(
status => res.status(200).send(status),
err => res.status(400).send(err)
);
});
@PanzerKadaver

This comment has been minimized.

Copy link

PanzerKadaver commented Nov 27, 2015

Hey pal,

I'm trying to use your code to upload a file but every time I try, I got an error 400. I've add some console.log and it's appear the "renameAsync" function is never call. Any thoughts ?

@tpiros

This comment has been minimized.

Copy link

tpiros commented Dec 12, 2015

@ptgamr, can you please provide a full example?

Thanks

@LaurensRietveld

This comment has been minimized.

Copy link

LaurensRietveld commented Jun 28, 2016

Are you sure the _cleanUnfinishedChunks method works? You're calling 'map' on a 'Promise' it seems. I'm assuming you intended to call fs.readdirSync instead of fs.readdirAsync? Oops, I see this is perfectly valid bluebird functionality ;)

and: concurency (line 78) is spelled wrong: should be concurrency

@BransonGitomeh

This comment has been minimized.

Copy link

BransonGitomeh commented Jul 3, 2016


(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileSuccess FlowFile_lastProgressCallback: 1467522223840_prevProgress: 1_prevUploadedSize: 11230731averageSpeed: 0bytes: nullchunks: Array[10]currentSpeed: 0error: falsefile: FileflowObj: Flowname: "Jess....: Object ERROR_VERIFY_CHUNK FlowChunk
(index):166 complete

im getting a specific ERROR_VERIFY_CHUNK on the FlowFile_lastProgressCallback call. from the upload dir i see that the files peices are still present and not combined in any way... anything im doing wrong? i used this code on the server, no changes

@BransonGitomeh

This comment has been minimized.

Copy link

BransonGitomeh commented Jul 3, 2016

@LaurensRietveld do is this working for you?

//...
                    let chunkFileNames = [];
                    for (let i = 1; i <= numberOfChunks; i++) {
                        chunkFileNames.push(this._getChunkFilename(i, identifier));
                    }

                    return Promise.map(
                        chunkFileNames,
                        chunkFileName => fs.statAsync(chunkFileName),
                        {concurrency: 2}
                    ).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName))
                    .then(filename => filename, () => 'ERROR_VERIFY_CHUNK');
//...

mine is throwing that error

@BransonGitomeh

This comment has been minimized.

Copy link

BransonGitomeh commented Jul 3, 2016

Aaaaaaaaaaaaaah... totally ok, missing uploads dir, fixed and its all working magic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.