Skip to content

Instantly share code, notes, and snippets.

@hoangsetup
Created March 26, 2022 13:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hoangsetup/a967db450c9abaa723085928b08f6816 to your computer and use it in GitHub Desktop.
Save hoangsetup/a967db450c9abaa723085928b08f6816 to your computer and use it in GitHub Desktop.
import environments from './environments';
import { IQueueItem } from './interfaces';
import getChannel from './rabbitmq';
export async function sendQueryToQueue(item: IQueueItem): Promise<boolean> {
const channel = await getChannel(environments.rabbitMQScraperQueue);
const message = JSON.stringify(item);
return channel.sendToQueue(environments.rabbitMQScraperQueue, Buffer.from(message));
}
export async function consume(onQuery: (item: IQueueItem) => Promise<void>) {
const channel = await getChannel(environments.rabbitMQScraperQueue);
channel.consume(environments.rabbitMQScraperQueue, async (message) => {
try {
if (!message) {
return;
}
const item = JSON.parse(message.content.toString()) as IQueueItem;
await onQuery(item);
} finally {
if (message) {
channel.ack(message);
}
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment