Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
var queue = async.queue(function(task, callback) {
var ObjectId = mongoose.Types.ObjectId
var query = task.query
var update = task.update
var defer = task.defer
query._id['$in'] = _.map(query._id['$in'], ObjectId)
var queryStream = scenarios.find(query).stream()
queryStream.pipe(streamWorker).on('end', function(foo) {
console.log('FOO:', foo)
})
//TODO move to data access class
queryStream.on('data', function(doc) {
doc = doc.toObject()
doc.linkId = doc._id
delete doc._id
delete doc.__v
tempModel.create(doc, function(err, newDoc) {
if(err) {
defer.reject(err)
} else {
var tempId = newDoc._id
tempModel.findOneAndUpdate({ _id: tempId, linkId: doc.linkId }, update, function(err, data) {
if(err) {
defer.reject(err)
} else {
scenarios.findOne({ _id: doc.linkId }, function(err, origDoc) {
if(err) {
defer.reject(err)
} else {
_.forIn(data._doc, function(value, key) {
if(origDoc[key] instanceof Array) {
origDoc[key].push.apply(origDoc[key], value)
origDoc[key] = _.uniq(origDoc[key], String)
} else if(typeof value !== 'function' && key !== '_id' && key !== '__v') {
origDoc.set(key, value)
}
})
origDoc.save()
}
})
}
})
}
})
})
queryStream.on('error', function(err) {
defer.reject(err)
})
queryStream.on('close', function() {
var resObj = {
data: true
}
//TODO this timeout may need to be removed and replaced
// with a better mechanism
setTimeout(function() {
defer.resolve(resObj)
callback()
}, 50)
})
}, 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.