Last active
January 14, 2017 03:10
-
-
Save ORESoftware/c953a23c70b9ca5a75d285da6f25bc61 to your computer and use it in GitHub Desktop.
the enqueue / enq method of an OPQ queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// enqueues 1 or more items, accepting a string or an array of strings | |
enq(lines: string | Array<string>, opts: IEnqueueOpts): Observable<any> { | |
opts = opts || {}; | |
if (opts.controlled) { | |
return this._enqControlled(lines, opts); | |
} | |
//default priority is the lowest priority, which is 1 | |
const priority = opts.priority || 1; | |
const isShare = opts.isShare === true; | |
lines = _.flattenDeep([lines]).map(function (l) { | |
// remove unicode characters because they will mess up | |
// our replace-line algorithm | |
return String(l).replace(/[^\x00-\x7F]/g, ''); | |
}); | |
let $add = this.init() | |
.flatMap(() => { | |
return acquireLock(this, '<enqueue>') | |
.flatMap(obj => { | |
return acquireLockRetry(this, obj) | |
}); | |
}) | |
.flatMap(obj => { | |
return appendFile(this, lines, priority) | |
.map(() => obj); | |
}) | |
.flatMap(obj => releaseLock(this, obj.id)) | |
.catch(err => { | |
// catch errors here and release the lock if it hasn't been released already | |
}) | |
.take(1); | |
if (isShare) { | |
// share() should be equivalent to publish().refCount() | |
// using isShare will allow this method to run/execute even if the returned | |
// observable has no subscribers | |
$add = $add.share(); | |
$add.subscribe(); | |
} | |
return $add; | |
}; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment