Skip to content

Instantly share code, notes, and snippets.

@gfosco
Last active April 5, 2022 22:10
Show Gist options
  • Star 19 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gfosco/131974d200c5e9fc6c94 to your computer and use it in GitHub Desktop.
Save gfosco/131974d200c5e9fc6c94 to your computer and use it in GitHub Desktop.
parse-jobqueue

README.md:

parse-jobqueue

The Parse Cloud Code job queue. Define, queue, and process work on a constant basis.

USAGE

First, you'll want to define some jobs:

jobqueue.job('hello', function() {
  console.log('Hello!');
  return Parse.Promise.as('Hi!');
});

jobqueue.job('add', function(params) {
  var sum = params.a + params.b;
  return Parse.Promise.as(sum);
});

jobqueue.job('updateCount', function(params, parseObjects) {
  var object = parseObjects[0];
  var field = params.field;
  var amount = params.count;
  object.increment(field, amount);
  return object.save();
});

jobqueue.job('createObject', function() {
  var obj = new Parse.Object('TestObject');
  return obj.save();
});

Then, queue up a few jobs, providing a queue name, job name, an array of scalar arguments, and an array of Parse Objects. These arguments will be available to the job, and the passed in Parse Objects will be included.

var jobqueue = require('cloud/parse-jobqueue');

jobqueue.enqueue('test', 'hello');
jobqueue.enqueue('test', 'add', { a: 1, b: 2 });
jobqueue.enqueue('test', 'updateCount', { field: 'countField', count: 1 }, [object]);
jobqueue.enqueue('test', 'createObject');

Next, define a Background Job that will run your worker:

Parse.Cloud.job('testworker', function(request, status) {
  jobqueue.worker(['test']);
});

You can get the result of a job (or a failed promise if it's not completed, ) with the status method.

jobqueue.status('jobIdHere').then(function(job) {
  // Job completed, check value of results, parseObjectResults.
}, function() {
  // Job invalid or not completed.
});

By default, this will process jobs for 4 minutes and 30 seconds and pause for 15 seconds after clearing the job queue. You can alter the defaults with values in milliseconds:

jobqueue.setDelayOnEmptyQueue(15000);
jobqueue.setRunLimit(4.5 * 60 * 1000);

With this module you can attain a finer granularity than simpler job systems running at every-minute intervals.



jobs.js:

var jobqueue = require('cloud/jobqueue');

jobqueue.job('hello', function(scalarArgs, objectArgs) {
  console.log('Hello!');
  return Parse.Promise.as('Hi!');
});

jobqueue.job('add', function(scalarArgs, objectArgs) {
  var sum = scalarArgs.a + scalarArgs.b;
  return Parse.Promise.as(sum);
});

jobqueue.job('updateCount', function(scalarArgs, objectArgs) {
  var object = objectArgs[0];
  object.increment(scalarArgs.field, scalarArgs.count);
  return object.save();
});

jobqueue.job('createObject', function(scalarArgs, objectArgs) {
  var obj = new Parse.Object('TestObject');
  return obj.save();
});

Parse.Cloud.job('workertest', function(request, status) {
  jobqueue.worker(status, ["test"]);
});

main.js:

var jobqueue = require('cloud/parse-jobqueue');
require('cloud/jobs');

Parse.Cloud.define('makeTestData', function(req, res) {

  var obj = new Parse.Object('TestObject');
  obj.set('count', 1);
  obj.save().then(function(obj) {
    var p1 = jobqueue.enqueue('test', 'hello');
    var p2 = jobqueue.enqueue('test', 'add', [1,2]);
    var p3 = jobqueue.enqueue('test', 'updateCount', ['count', 1], [obj]);
    var p4 = jobqueue.enqueue('test', 'createObject');
    return Parse.Promise.when(p1, p2, p3, p4);
  }).then(function(p1, p2, p3, p4) {
    console.log(p1);
    console.log(p2);
    console.log(p3);
    console.log(p4);
    res.success();
  }, function(err) {
    res.error();
  });

});

Parse.Cloud.define('getJobResult', function(req, res) {

  if (!req.params.jobId) {
    return res.error('Invalid job.');
  }

  jobqueue.status(req.params.jobId).then(function(obj) {
    res.success(obj);
  }, function(err) {
    res.error('Invalid job or job not yet completed.');
  });

});

Following is the actual cloud module for job management, queueing, and execution. It uses 2 Parse classes, ParseJobQueue and ParseJobLog.. These should be secured through the data browser to prevent public access for all access types.

parse-jobqueue.js

/**
 * parse-jobqueue
 *
 * An implementation of Resque for Parse Cloud Code
 *
 * Uses background jobs as workers, and Parse Data classes for storage
 *
 * Author: Fosco Marotto <fjm@fb.com>
 */

var JobQueue = Parse.Object.extend("ParseJobQueue");
var JobLog = Parse.Object.extend("ParseJobLog");

var jobs = [];
var queues = [];
var startTime;
var shouldLog = false;
var delayTime = 15000; // default 15s
var runTime = 0.5 * 60 * 1000; // default 4m30s
var statusObject; // the jobs status object, set on execution

var setDelayOnEmptyQueue = function(timeInMs) {
  delayTime = timeInMs;
}

var enableLogging = function() {
  shouldLog = true;
}

var disableLogging = function() {
  shouldLog = false;
}

var setRunLimit = function(timeInMs) {
  runTime = timeInMs;
}

var job = function(jobName, func) {
  jobs[jobName] = func;
}

var enqueue = function(queue, jobName, paramsObject, parseObjectArray) {
  paramsObject = (paramsObject ? paramsObject : {});
  parseObjectArray = (parseObjectArray ? parseObjectArray : []);
  if (!queue || !jobName) {
    return log(
      'Failed to queue invalid job, missing jobName or queue',
      {
        'queue' : queue,
        'jobName' : jobName,
        'paramsObject' : paramsObject,
        'parseObjectArray' : parseObjectArray
      }
    );
  }
  var promise = new Parse.Promise();
  var job = new JobQueue();
  job.set('queue', queue);
  job.set('jobName', jobName);
  job.set('paramsObject', paramsObject);
  job.set('parseObjectArray', parseObjectArray);
  job.set('results', []);
  job.set('parseObjectResults', []);
  job.set('processed', 0);
  job.set('status', 'new');
  job.set('result', '');
  job.set('ACL', {});
  job.save(null, { useMasterKey : true }).then(function(job) {
    promise.resolve(job.id);
  }, function(job, err) {
    promise.reject(err);
  });
  return promise;
};

var worker = function(status, queuesParam) {
  statusObject = status;
  queues = queuesParam;
  status.message('Worker starting.');
  startTime = Date.now();
  if (!queues || !queues.length) {
    return log('Failed to start worker, queues not provided', []);
  }
  return log(
    'Worker started at ' + startTime,
    {
      'queues' : queues
    }
  ).then(poll);
};

var poll = function() {
  return Parse.Promise.as().then(timeLimitCheck).then(function() {
    var query = new Parse.Query(JobQueue);
    query.containedIn('queue', queues);
    query.equalTo('processed', 0);
    query.include('objectArgs');
    var jobCount = 0;
    return query.each(function(job) {
      var promise = new Parse.Promise();
      perform(job).then(function() {
        jobCount++;
        promise.resolve();
      }, function(err) {
        promise.resolve();
      });
      return promise;
    }, { useMasterKey: true }).then(function() {
      if (jobCount) {
        return log(
          'Worker cycle completed after ' + jobCount + ' jobs processed',
          { jobCount : jobCount }
        );
      } else {
        return Parse.Promise.as();
      }
    });
  }).then(delay).then(poll);
};

var perform = function(job) {
  job.increment('processed');
  return job.save(null, { useMasterKey : true }).then(function(job) {
    var promise = new Parse.Promise();
    if (job.get('processed') != 1) {
      return promise.reject();
    }
    statusObject.message('Running job ' + job.id);
    var startTime = Date.now();
    var jobName = job.get('jobName');
    if (!jobs[jobName]) {
      return log(
        'Undefined jobName, ' + jobName,
        { job : job }
      ).then(function() {
        return job.save({
          status : 'error',
          result : 'Undefined jobName',
          elapsedTime : (Date.now() - startTime)
        }, { useMasterKey : true });
      }).then(function() {
        promise.resolve();
      });
    }
    var promise = new Parse.Promise();
    var paramsObject = job.get('paramsObject');
    var parseObjectArray = job.get('parseObjectArray');
    jobs[jobName](paramsObject, parseObjectArray).then(function(result) {
      var res = '';
      var results = [];
      var parseObjectResults = [];
      if (!(result instanceof Array)) {
        result = [result];
      }
      for (var i = 0; i < result.length; i++) {
        if (result[i].className) {
          parseObjectResults.push(result[i]);
        } else {
          results.push(result[i]);
        }
      }
      return job.save({
        'status' : 'completed',
        'result' : JSON.stringify(result),
        'results' : results,
        'parseObjectResults' : parseObjectResults,
        'elapsedTime' : (Date.now() - startTime)
      }, { useMasterKey : true });
    }).then(timeLimitCheck).then(function() {
      promise.resolve();
    }, function(err) {
      promise.resolve();
    });
    return promise;
  });
};

var status = function(id) {
  var promise = new Parse.Promise();
  var query = new Parse.Query(JobQueue);
  query.include('parseObjectResults');
  query.include('parseObjectArray');
  query.equalTo('status', 'completed');
  query.get(id, { useMasterKey: true }).then(function(obj) {
    promise.resolve(obj);
  }, function(err) {
    promise.reject();
  });
  return promise;
}

var log = function(message, data) {
  if (!shouldLog) return Parse.Promise.as();
  var entry = new JobLog();
  entry.set('message', message);
  entry.set('data', data);
  entry.set('ACL', {});
  return entry.save();
}

var delayUntil;
var delayPromise;

var delay = function() {
  statusObject.message('Worker sleeping for ' + delayTime + 'ms.');
  delayUntil = Date.now() + delayTime;
  delayPromise = new Parse.Promise();
  _delay();
  return delayPromise;
};

function _delay() {
  if (Date.now() > delayUntil) {
    delayPromise.resolve();
    return;
  }
  process.nextTick(_delay);
}

function timeLimitCheck() {
  if (Date.now() > (startTime + runTime)) {
    statusObject.success('Worker exiting after run limit.');
    return log(
      'Worker closing at end of time limit.', [startTime, runTime, Date.now()]
    );
  }
  return Parse.Promise.as();
}

module.exports = {
  job : job,
  enqueue : enqueue,
  worker : worker,
  status : status,
  setDelayOnEmptyQueue : setDelayOnEmptyQueue,
  setRunLimit : setRunLimit,
  enableLogging : enableLogging,
  disableLogging : disableLogging
};
@satyadeepk
Copy link

Great implementation Fosco for queuing up worker jobs! One question,
How is the actual background job that runs the worker triggered? Or should it be scheduled every minute?

@gfosco
Copy link
Author

gfosco commented Aug 31, 2014

I would schedule it for every 5 minutes, as the default will stay running for ~4m30s. Probably going to simplify this to remove the concept of queues (just have 1,) and write up a blog post about it in more detail.

@khshah3
Copy link

khshah3 commented Nov 24, 2014

Note: I think there are a few mistakes.

  1. var jobqueue = require('cloud/jobqueue');
  2. var p2 = jobqueue.enqueue('test', 'add', [1,2]);
    
    var p3 = jobqueue.enqueue('test', 'updateCount', ['count', 1], [obj]);

I think arg2 is object not array.

@lukelan
Copy link

lukelan commented Apr 24, 2015

Great implement, do you have the simplify version 1 background job and no queue as you mention above. it would be great to hear it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment