Created
July 4, 2016 10:19
-
-
Save Siphion/669e23920bd6217bb1098043f17fa656 to your computer and use it in GitHub Desktop.
not so sure about this
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//// TODO: Use power queue to handle throttling etc. | |
//// Use observe to monitor changes and have it create tasks for the power queue | |
//// to perform. | |
/** | |
* @public | |
* @type Object | |
*/ | |
FS.FileWorker = {}; | |
FS.FileWorker.observers = {}; // HERE HERE HERE | |
/** | |
* @method FS.FileWorker.observe | |
* @public | |
* @param {FS.Collection} fsCollection | |
* @returns {undefined} | |
* | |
* Sets up observes on the fsCollection to store file copies and delete | |
* temp files at the appropriate times. | |
*/ | |
FS.FileWorker.observe = function(fsCollection) { | |
// Initiate observe for finding newly uploaded/added files that need to be stored | |
// per store. | |
FS.Utility.each(fsCollection.options.stores, function(store) { | |
var storeName = store.name; | |
fsCollection.files.find(getReadyQuery(storeName), { | |
fields: { | |
copies: 0 | |
} | |
}).observe({ | |
added: function(fsFile) { | |
// added will catch fresh files | |
FS.debug && console.log("FileWorker ADDED - calling saveCopy", storeName, "for", fsFile._id); | |
saveCopy(fsFile, storeName); | |
}, | |
changed: function(fsFile) { | |
// changed will catch failures and retry them | |
FS.debug && console.log("FileWorker CHANGED - calling saveCopy", storeName, "for", fsFile._id); | |
saveCopy(fsFile, storeName); | |
} | |
}); | |
}); | |
// Initiate observe for finding files that have been stored so we can delete | |
// any temp files | |
// HERE HERE HERE | |
FS.FileWorker.observers.added = fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ | |
added: function(fsFile) { | |
FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id); | |
FS.TempStore.removeFile(fsFile); | |
} | |
}); | |
// Initiate observe for catching files that have been removed and | |
// removing the data from all stores as well | |
// HERE HERE HERE | |
FS.FileWorker.observers.removed = fsCollection.files.find().observe({ | |
removed: function(fsFile) { | |
FS.debug && console.log('FileWorker REMOVED - removing all stored data for', fsFile._id); | |
//remove from temp store | |
FS.TempStore.removeFile(fsFile); | |
//delete from all stores | |
FS.Utility.each(fsCollection.options.stores, function(storage) { | |
storage.adapter.remove(fsFile); | |
}); | |
} | |
}); | |
}; | |
/** | |
* @method getReadyQuery | |
* @private | |
* @param {string} storeName - The name of the store to observe | |
* | |
* Returns a selector that will be used to identify files that | |
* have been uploaded but have not yet been stored to the | |
* specified store. | |
* | |
* { | |
* uploadedAt: {$exists: true}, | |
* 'copies.storeName`: null, | |
* 'failures.copies.storeName.doneTrying': {$ne: true} | |
* } | |
*/ | |
function getReadyQuery(storeName) { | |
var selector = {uploadedAt: {$exists: true}}; | |
selector['copies.' + storeName] = null; | |
selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true}; | |
return selector; | |
} | |
/** | |
* @method getDoneQuery | |
* @private | |
* @param {Array} stores - The stores array from the FS.Collection options | |
* | |
* Returns a selector that will be used to identify files where all | |
* stores have successfully save or have failed the | |
* max number of times but still have chunks. The resulting selector | |
* should be something like this: | |
* | |
* { | |
* $and: [ | |
* {chunks: {$exists: true}}, | |
* { | |
* $or: [ | |
* { | |
* $and: [ | |
* { | |
* 'copies.storeName': {$ne: null} | |
* }, | |
* { | |
* 'copies.storeName': {$ne: false} | |
* } | |
* ] | |
* }, | |
* { | |
* 'failures.copies.storeName.doneTrying': true | |
* } | |
* ] | |
* }, | |
* REPEATED FOR EACH STORE | |
* ] | |
* } | |
* | |
*/ | |
function getDoneQuery(stores) { | |
var selector = { | |
$and: [] | |
}; | |
// Add conditions for all defined stores | |
FS.Utility.each(stores, function(store) { | |
var storeName = store.name; | |
var copyCond = {$or: [{$and: []}]}; | |
var tempCond = {}; | |
tempCond["copies." + storeName] = {$ne: null}; | |
copyCond.$or[0].$and.push(tempCond); | |
tempCond = {}; | |
tempCond["copies." + storeName] = {$ne: false}; | |
copyCond.$or[0].$and.push(tempCond); | |
tempCond = {}; | |
tempCond['failures.copies.' + storeName + '.doneTrying'] = true; | |
copyCond.$or.push(tempCond); | |
selector.$and.push(copyCond); | |
}) | |
return selector; | |
} | |
/** | |
* @method saveCopy | |
* @private | |
* @param {FS.File} fsFile | |
* @param {string} storeName | |
* @param {Object} options | |
* @param {Boolean} [options.overwrite=false] - Force save to the specified store? | |
* @returns {undefined} | |
* | |
* Saves to the specified store. If the | |
* `overwrite` option is `true`, will save to the store even if we already | |
* have, potentially overwriting any previously saved data. Synchronous. | |
*/ | |
function saveCopy(fsFile, storeName, options) { | |
options = options || {}; | |
var storage = FS.StorageAdapter(storeName); | |
if (!storage) { | |
throw new Error('No store named "' + storeName + '" exists'); | |
} | |
FS.debug && console.log('saving to store ' + storeName); | |
var writeStream = storage.adapter.createWriteStream(fsFile); | |
var readStream = FS.TempStore.createReadStream(fsFile); | |
// Pipe the temp data into the storage adapter | |
readStream.pipe(writeStream); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment