-
-
Save mozzius/183203c6130d58a828c8bb4522321fba to your computer and use it in GitHub Desktop.
celery-node concurrency patch file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/dist/app/base.d.ts b/dist/app/base.d.ts | |
index 02cb4ae9bd16b2ca553a601065dc8cecc2dffc7a..65d3777106d5d0dd14d764fe4847addb45336dce 100644 | |
--- a/dist/app/base.d.ts | |
+++ b/dist/app/base.d.ts | |
@@ -15,7 +15,7 @@ export default class Base { | |
* | |
* @constructor Base | |
*/ | |
- constructor(broker: string, backend: string, queue?: string); | |
+ constructor(broker: string, backend: string, queue?: string, concurrency?: number); | |
get broker(): CeleryBroker; | |
get backend(): CeleryBackend; | |
/** | |
diff --git a/dist/app/base.js b/dist/app/base.js | |
index a3a7563dd5937b2f72541a4d2d7faf463f215f2e..be65c4ae727eac2c34beaf9423e92713bceee16a 100644 | |
--- a/dist/app/base.js | |
+++ b/dist/app/base.js | |
@@ -14,11 +14,12 @@ class Base { | |
* | |
* @constructor Base | |
*/ | |
- constructor(broker, backend, queue = "celery") { | |
+ constructor(broker, backend, queue = "celery", concurrency) { | |
this.conf = conf_1.defaultConf(); | |
this.conf.CELERY_BROKER = broker; | |
this.conf.CELERY_BACKEND = backend; | |
this.conf.CELERY_QUEUE = queue; | |
+ this.conf.CONCURRENCY = concurrency; | |
} | |
get broker() { | |
if (!this._broker) { | |
diff --git a/dist/app/conf.d.ts b/dist/app/conf.d.ts | |
index e173e7bfee4b986ecb387a9a4a1fa6841bf27e83..7f19236663667387e7f3d117334ed794ed548c9d 100644 | |
--- a/dist/app/conf.d.ts | |
+++ b/dist/app/conf.d.ts | |
@@ -5,5 +5,6 @@ export interface CeleryConf { | |
CELERY_BACKEND_OPTIONS: object; | |
CELERY_QUEUE: string; | |
TASK_PROTOCOL: number; | |
+ CONCURRENCY: number; | |
} | |
export declare function defaultConf(): CeleryConf; | |
diff --git a/dist/app/worker.js b/dist/app/worker.js | |
index a8265fc98ce81335db6f4b11b59a85b366e4fcbb..5628efa3abb8f23576f4b9f1ff0a05d33a47db1d 100644 | |
--- a/dist/app/worker.js | |
+++ b/dist/app/worker.js | |
@@ -81,7 +81,20 @@ class Worker extends base_1.default { | |
*/ | |
getConsumer(queue) { | |
const onMessage = this.createTaskHandler(); | |
- return () => this.broker.subscribe(queue, onMessage); | |
+ const concurrencyLimiter = this.createConcurrencyLimiter(); | |
+ return () => this.broker.subscribe(queue, onMessage, concurrencyLimiter); | |
+ } | |
+ createConcurrencyLimiter() { | |
+ const concurrencyLimiter = async (callback) => { | |
+ if (!this.conf.CONCURRENCY) { | |
+ return process.nextTick(callback); | |
+ } | |
+ if (this.activeTasks.size >= this.conf.CONCURRENCY) { | |
+ await Promise.any(Array.from(this.activeTasks)); | |
+ } | |
+ return process.nextTick(callback); | |
+ } | |
+ return concurrencyLimiter; | |
} | |
createTaskHandler() { | |
const onTaskReceived = (message) => { | |
diff --git a/dist/index.d.ts b/dist/index.d.ts | |
index 1c9053222bb6523f342764dd0399d2f518683875..6ebef9b2da061f8ba48034c361ff428e7d7e533c 100644 | |
--- a/dist/index.d.ts | |
+++ b/dist/index.d.ts | |
@@ -13,4 +13,4 @@ export declare function createClient(broker?: string, backend?: string, queue?: | |
* @function | |
* @returns {Worker} | |
*/ | |
-export declare function createWorker(broker?: string, backend?: string, queue?: string): Worker; | |
+export declare function createWorker(broker?: string, backend?: string, queue?: string, concurrency?: number): Worker; | |
diff --git a/dist/index.js b/dist/index.js | |
index a6bfad25349a3b617940935eb11d82de07680e0c..97a7b9320e475291cdaf32c9c189a19ce17865a0 100644 | |
--- a/dist/index.js | |
+++ b/dist/index.js | |
@@ -19,7 +19,7 @@ exports.createClient = createClient; | |
* @function | |
* @returns {Worker} | |
*/ | |
-function createWorker(broker = "amqp://", backend = "amqp://", queue = "celery") { | |
- return new worker_1.default(broker, backend, queue); | |
+function createWorker(broker = "amqp://", backend = "amqp://", queue = "celery", concurrency) { | |
+ return new worker_1.default(broker, backend, queue, concurrency); | |
} | |
exports.createWorker = createWorker; | |
diff --git a/dist/kombu/brokers/redis.js b/dist/kombu/brokers/redis.js | |
index c0b561bddae1236a946781531a4d1bbd8762ca20..3c1940078ff32110c166a935aeb9c9c50d13e69a 100644 | |
--- a/dist/kombu/brokers/redis.js | |
+++ b/dist/kombu/brokers/redis.js | |
@@ -79,13 +79,14 @@ class RedisBroker { | |
* @method RedisBroker#subscribe | |
* @param {string} queue | |
* @param {Function} callback | |
+ * @param {Function} concurrencyLimiter | |
* @returns {Promise} | |
*/ | |
- subscribe(queue, callback) { | |
+ subscribe(queue, callback, concurrencyLimiter) { | |
const promiseCount = 1; | |
return this.isReady().then(() => { | |
for (let index = 0; index < promiseCount; index += 1) { | |
- this.channels.push(new Promise(resolve => this.receive(index, resolve, queue, callback))); | |
+ this.channels.push(new Promise(resolve => this.receive(index, resolve, queue, callback, concurrencyLimiter))); | |
} | |
return Promise.all(this.channels); | |
}); | |
@@ -96,9 +97,10 @@ class RedisBroker { | |
* @param {Fucntion} resolve | |
* @param {string} queue | |
* @param {Function} callback | |
+ * @param {Function} concurrencyLimiter | |
*/ | |
- receive(index, resolve, queue, callback) { | |
- process.nextTick(() => this.recieveOneOnNextTick(index, resolve, queue, callback)); | |
+ receive(index, resolve, queue, callback, concurrencyLimiter) { | |
+ concurrencyLimiter(() => this.recieveOneOnNextTick(index, resolve, queue, callback, concurrencyLimiter)); | |
} | |
/** | |
* @private | |
@@ -106,9 +108,10 @@ class RedisBroker { | |
* @param {Function} resolve | |
* @param {String} queue | |
* @param {Function} callback | |
+ * @param {Function} concurrencyLimiter | |
* @returns {Promise} | |
*/ | |
- recieveOneOnNextTick(index, resolve, queue, callback) { | |
+ recieveOneOnNextTick(index, resolve, queue, callback, concurrencyLimiter) { | |
if (this.closing) { | |
resolve(); | |
return; | |
@@ -120,7 +123,7 @@ class RedisBroker { | |
} | |
Promise.resolve(); | |
}) | |
- .then(() => this.receive(index, resolve, queue, callback)) | |
+ .then(() => this.receive(index, resolve, queue, callback, concurrencyLimiter)) | |
.catch(err => console.log(err)); | |
} | |
/** |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment