Skip to content

Instantly share code, notes, and snippets.

@apendua
Created March 4, 2017 12:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apendua/6387125a6a1c8fa475acf62142f794e4 to your computer and use it in GitHub Desktop.
Save apendua/6387125a6a1c8fa475acf62142f794e4 to your computer and use it in GitHub Desktop.
A simple pattern for concurrent workers using observers from Meteor collections
import { Meteor } from 'meteor/meteor';
import { Tasks } from '/imports/common/collections';
// The workerId should be unique, and at the same time it should persist
// across server restarts, because otherwise we may end up with unfinished tasks falling into limbo.
const workerId = process.env.WORKER_ID || 'theOnlyWorker';
const tasksInProgress = {};
const makeTaskProcessor = execute => (task) => {
if (!task.workerId) {
Meteor.defer(() => {
Tasks.update({
_id: task._id,
workerId: null,
}, {
$set: {
workerId,
},
});
});
} else if (!tasksInProgress[task._id]) {
tasksInProgress[task._id] = true;
Meteor.defer(() => {
execute(task);
delete tasksInProgress[task._id];
});
}
};
const processTask = makeTaskProcessor((task) => {
// Here is where the task execution happens.
// ...
// At the end we need to remove it somehow from tasks queue.
// Removing from the collection is the simplest option, but not the only one.
Tasks.remove({ _id: task._id });
});
Meteor.startup(() => {
// On each restart we clear assignment of tasks that could not be accomplished for some reason.
Tasks.update({ workerId }, { $unset: { workerId: 1 } }, { multi: true });
// With this setup, the observer ensures that there are always at most 10 items
Tasks.find({
$or: [
{ workerId: null },
{ workerId },
],
}, {
limit: 10,
// With "workerId: -1" nulls come last, which is exactly what we want.
sort: { workerId: -1 },
}).observe({
added: processTask,
changed: processTask,
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment