Skip to content

Instantly share code, notes, and snippets.

@nikkanetiya
Last active March 18, 2023 13:27
Show Gist options
  • Save nikkanetiya/ae3fe3ce0d5eeaa050c725e320eec850 to your computer and use it in GitHub Desktop.
Save nikkanetiya/ae3fe3ce0d5eeaa050c725e320eec850 to your computer and use it in GitHub Desktop.
Email Sending With Rate per account rate limit with PubSub, Cloud Task
class EmailSender {
constructor(rateLimiter, projectId, queueLocation, queueName) {
this.rateLimiter = rateLimiter;
this.projectId = projectId;
this.queueLocation = queueLocation;
this.queueName = queueName;
}
async sendEmailWithRateLimit(from, to, subject, body, accountId, maxMessages, windowMs) {
const key = `${accountId}_email_rate_limit`;
const canSend = await this.rateLimiter.tryAcquire(key, maxMessages, windowMs);
if (canSend) {
const message = {
from: from,
to: to,
subject: subject,
body: body
};
const data = Buffer.from(JSON.stringify(message)).toString('base64');
const project = `projects/${this.projectId}`;
const queuePath = tasksClient.queuePath(this.projectId, this.queueLocation, this.queueName);
const task = {
httpRequest: {
httpMethod: 'POST',
url: '/sendEmail',
body: data,
headers: {
'Content-Type': 'application/json'
}
},
scheduleTime: {
seconds: Math.ceil((await this.rateLimiter.getNextAvailableTime(key, maxMessages, windowMs)) / 1000)
}
};
const [createdTask] = await tasksClient.createTask({parent: queuePath, task: task});
return true;
} else {
return false;
}
}
}
const { CloudTasksClient } = require('@google-cloud/tasks');
const cloudTasksClient = new CloudTasksClient();
class PubsubMessageProcessor {
constructor(emailSender, project, location, queueName, rateLimit) {
this.emailSender = emailSender;
this.project = project;
this.location = location;
this.queueName = queueName;
this.rateLimit = rateLimit;
}
async processMessage(message) {
const data = message.data ? Buffer.from(message.data, 'base64').toString() : '{}';
const email = JSON.parse(data);
try {
const isEmailSent = await this.emailSender.sendEmailWithRateLimit(email);
if (isEmailSent) return true;
} catch (err) {
console.error(`Error sending email: ${err}`);
throw err;
}
// Schedule the next email to be sent
const task = {
httpRequest: {
httpMethod: 'POST',
url: '/tasks/queueEmail',
body: Buffer.from(JSON.stringify(email)).toString('base64'),
headers: {
'Content-Type': 'application/json',
},
},
scheduleTime: this.getNextScheduleTime(),
};
const parent = cloudTasksClient.queuePath(this.project, this.location, this.queueName);
await cloudTasksClient.createTask({ parent, task });
}
getNextScheduleTime() {
const nextAvailableTime = this.rateLimit.getNextAvailableTime();
const delaySeconds = Math.max(0, nextAvailableTime - Date.now()) / 1000;
const scheduleTime = new Date(Date.now() + delaySeconds * 1000);
return scheduleTime.toISOString();
}
}
module.exports = PubsubMessageProcessor;
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const {CloudTasksClient} = require('@google-cloud/tasks');
const tasksClient = new CloudTasksClient();
const redis = require('redis');
class RateLimiter {
constructor(redisUrl) {
this.client = redis.createClient({url: redisUrl});
}
async tryAcquire(key, maxMessages, windowMs) {
const now = Date.now();
const end = now + windowMs;
const multi = this.client.multi();
multi.zremrangebyscore(key, 0, now - windowMs);
multi.zadd(key, now, now);
multi.zcard(key);
multi.expire(key, Math.ceil(windowMs / 1000));
const [_, count] = await multi.execAsync();
return count <= maxMessages;
}
async getNextAvailableTime(key, maxMessages, windowMs) {
const now = Date.now();
const end = now + windowMs;
const multi = this.client.multi();
multi.zremrangebyscore(key, 0, now - windowMs);
multi.zadd(key, now, now);
multi.zcard(key);
multi.expire(key, Math.ceil(windowMs / 1000));
const [_, count] = await multi.execAsync();
if (count <= maxMessages) {
return now;
} else {
const nextAvailableTime = await this.client.zrange(key, 0, 0, 'WITHSCORES');
return parseInt(nextAvailableTime[1]) + windowMs;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment