Skip to content

Instantly share code, notes, and snippets.

@mozzius
Created February 13, 2024 14:49
Show Gist options
  • Save mozzius/183203c6130d58a828c8bb4522321fba to your computer and use it in GitHub Desktop.
Save mozzius/183203c6130d58a828c8bb4522321fba to your computer and use it in GitHub Desktop.
celery-node concurrency patch file
diff --git a/dist/app/base.d.ts b/dist/app/base.d.ts
index 02cb4ae9bd16b2ca553a601065dc8cecc2dffc7a..65d3777106d5d0dd14d764fe4847addb45336dce 100644
--- a/dist/app/base.d.ts
+++ b/dist/app/base.d.ts
@@ -15,7 +15,7 @@ export default class Base {
*
* @constructor Base
*/
- constructor(broker: string, backend: string, queue?: string);
+ constructor(broker: string, backend: string, queue?: string, concurrency?: number);
get broker(): CeleryBroker;
get backend(): CeleryBackend;
/**
diff --git a/dist/app/base.js b/dist/app/base.js
index a3a7563dd5937b2f72541a4d2d7faf463f215f2e..be65c4ae727eac2c34beaf9423e92713bceee16a 100644
--- a/dist/app/base.js
+++ b/dist/app/base.js
@@ -14,11 +14,12 @@ class Base {
*
* @constructor Base
*/
- constructor(broker, backend, queue = "celery") {
+ constructor(broker, backend, queue = "celery", concurrency) {
this.conf = conf_1.defaultConf();
this.conf.CELERY_BROKER = broker;
this.conf.CELERY_BACKEND = backend;
this.conf.CELERY_QUEUE = queue;
+ this.conf.CONCURRENCY = concurrency;
}
get broker() {
if (!this._broker) {
diff --git a/dist/app/conf.d.ts b/dist/app/conf.d.ts
index e173e7bfee4b986ecb387a9a4a1fa6841bf27e83..7f19236663667387e7f3d117334ed794ed548c9d 100644
--- a/dist/app/conf.d.ts
+++ b/dist/app/conf.d.ts
@@ -5,5 +5,6 @@ export interface CeleryConf {
CELERY_BACKEND_OPTIONS: object;
CELERY_QUEUE: string;
TASK_PROTOCOL: number;
+ CONCURRENCY: number;
}
export declare function defaultConf(): CeleryConf;
diff --git a/dist/app/worker.js b/dist/app/worker.js
index a8265fc98ce81335db6f4b11b59a85b366e4fcbb..5628efa3abb8f23576f4b9f1ff0a05d33a47db1d 100644
--- a/dist/app/worker.js
+++ b/dist/app/worker.js
@@ -81,7 +81,20 @@ class Worker extends base_1.default {
*/
getConsumer(queue) {
const onMessage = this.createTaskHandler();
- return () => this.broker.subscribe(queue, onMessage);
+ const concurrencyLimiter = this.createConcurrencyLimiter();
+ return () => this.broker.subscribe(queue, onMessage, concurrencyLimiter);
+ }
+ createConcurrencyLimiter() {
+ const concurrencyLimiter = async (callback) => {
+ if (!this.conf.CONCURRENCY) {
+ return process.nextTick(callback);
+ }
+ if (this.activeTasks.size >= this.conf.CONCURRENCY) {
+ await Promise.any(Array.from(this.activeTasks));
+ }
+ return process.nextTick(callback);
+ }
+ return concurrencyLimiter;
}
createTaskHandler() {
const onTaskReceived = (message) => {
diff --git a/dist/index.d.ts b/dist/index.d.ts
index 1c9053222bb6523f342764dd0399d2f518683875..6ebef9b2da061f8ba48034c361ff428e7d7e533c 100644
--- a/dist/index.d.ts
+++ b/dist/index.d.ts
@@ -13,4 +13,4 @@ export declare function createClient(broker?: string, backend?: string, queue?:
* @function
* @returns {Worker}
*/
-export declare function createWorker(broker?: string, backend?: string, queue?: string): Worker;
+export declare function createWorker(broker?: string, backend?: string, queue?: string, concurrency?: number): Worker;
diff --git a/dist/index.js b/dist/index.js
index a6bfad25349a3b617940935eb11d82de07680e0c..97a7b9320e475291cdaf32c9c189a19ce17865a0 100644
--- a/dist/index.js
+++ b/dist/index.js
@@ -19,7 +19,7 @@ exports.createClient = createClient;
* @function
* @returns {Worker}
*/
-function createWorker(broker = "amqp://", backend = "amqp://", queue = "celery") {
- return new worker_1.default(broker, backend, queue);
+function createWorker(broker = "amqp://", backend = "amqp://", queue = "celery", concurrency) {
+ return new worker_1.default(broker, backend, queue, concurrency);
}
exports.createWorker = createWorker;
diff --git a/dist/kombu/brokers/redis.js b/dist/kombu/brokers/redis.js
index c0b561bddae1236a946781531a4d1bbd8762ca20..3c1940078ff32110c166a935aeb9c9c50d13e69a 100644
--- a/dist/kombu/brokers/redis.js
+++ b/dist/kombu/brokers/redis.js
@@ -79,13 +79,14 @@ class RedisBroker {
* @method RedisBroker#subscribe
* @param {string} queue
* @param {Function} callback
+ * @param {Function} concurrencyLimiter
* @returns {Promise}
*/
- subscribe(queue, callback) {
+ subscribe(queue, callback, concurrencyLimiter) {
const promiseCount = 1;
return this.isReady().then(() => {
for (let index = 0; index < promiseCount; index += 1) {
- this.channels.push(new Promise(resolve => this.receive(index, resolve, queue, callback)));
+ this.channels.push(new Promise(resolve => this.receive(index, resolve, queue, callback, concurrencyLimiter)));
}
return Promise.all(this.channels);
});
@@ -96,9 +97,10 @@ class RedisBroker {
* @param {Fucntion} resolve
* @param {string} queue
* @param {Function} callback
+ * @param {Function} concurrencyLimiter
*/
- receive(index, resolve, queue, callback) {
- process.nextTick(() => this.recieveOneOnNextTick(index, resolve, queue, callback));
+ receive(index, resolve, queue, callback, concurrencyLimiter) {
+ concurrencyLimiter(() => this.recieveOneOnNextTick(index, resolve, queue, callback, concurrencyLimiter));
}
/**
* @private
@@ -106,9 +108,10 @@ class RedisBroker {
* @param {Function} resolve
* @param {String} queue
* @param {Function} callback
+ * @param {Function} concurrencyLimiter
* @returns {Promise}
*/
- recieveOneOnNextTick(index, resolve, queue, callback) {
+ recieveOneOnNextTick(index, resolve, queue, callback, concurrencyLimiter) {
if (this.closing) {
resolve();
return;
@@ -120,7 +123,7 @@ class RedisBroker {
}
Promise.resolve();
})
- .then(() => this.receive(index, resolve, queue, callback))
+ .then(() => this.receive(index, resolve, queue, callback, concurrencyLimiter))
.catch(err => console.log(err));
}
/**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment