Skip to content

Instantly share code, notes, and snippets.

@ORESoftware
Last active January 14, 2017 03:10
Show Gist options
  • Save ORESoftware/c953a23c70b9ca5a75d285da6f25bc61 to your computer and use it in GitHub Desktop.
Save ORESoftware/c953a23c70b9ca5a75d285da6f25bc61 to your computer and use it in GitHub Desktop.
the enqueue / enq method of an OPQ queue
// enqueues 1 or more items, accepting a string or an array of strings
enq(lines: string | Array<string>, opts: IEnqueueOpts): Observable<any> {
opts = opts || {};
if (opts.controlled) {
return this._enqControlled(lines, opts);
}
//default priority is the lowest priority, which is 1
const priority = opts.priority || 1;
const isShare = opts.isShare === true;
lines = _.flattenDeep([lines]).map(function (l) {
// remove unicode characters because they will mess up
// our replace-line algorithm
return String(l).replace(/[^\x00-\x7F]/g, '');
});
let $add = this.init()
.flatMap(() => {
return acquireLock(this, '<enqueue>')
.flatMap(obj => {
return acquireLockRetry(this, obj)
});
})
.flatMap(obj => {
return appendFile(this, lines, priority)
.map(() => obj);
})
.flatMap(obj => releaseLock(this, obj.id))
.catch(err => {
// catch errors here and release the lock if it hasn't been released already
})
.take(1);
if (isShare) {
// share() should be equivalent to publish().refCount()
// using isShare will allow this method to run/execute even if the returned
// observable has no subscribers
$add = $add.share();
$add.subscribe();
}
return $add;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment