Skip to content

Instantly share code, notes, and snippets.

@kleva-j
Last active July 16, 2022 21:00
Show Gist options
  • Save kleva-j/df954825035c0c92a6d1c93c3dae153d to your computer and use it in GitHub Desktop.
Save kleva-j/df954825035c0c92a6d1c93c3dae153d to your computer and use it in GitHub Desktop.
An implementation of a message queue system in Nodejs
import * as http from 'http';
import * as url from 'url';
import { Incoming } from './incoming.interface';
import { QueuePersistence } from './persist-queues';
let queues: QueuePersistence = new QueuePersistence();
(async () => {
const server = await http.createServer(endpoint);
server.listen(3000);
})();
function endpoint(req: http.IncomingMessage, res: http.ServerResponse) {
if (req.method === 'GET' && req.url) {
const queryData = (url.parse(req.url, true).query) as unknown as Incoming;
if (queryData.name && queryData.message) {
queues.pushMessageToQueue(queryData.name, queryData.message);
res.end('OK');
} else if (queryData.name) {
const message = queues.getMessageFromQueue(queryData.name);
res.end(message);
} else {
res.end('query parameters are not correct');
}
}
}
export interface Incoming {
name: string;
message: string;
}
import * as fs from 'fs';
import { QueueStructure } from './queue-structure.interface';
export class QueuePersistence {
private _queueDefinitions: string[] = [];
private _queuePersistance: QueueStructure[] = [];
constructor() {
this.createQueueDefinitionArray();
this.createQueuePersistance();
}
private getQueueByName(name: string): QueueStructure | undefined {
let queue = this._queuePersistance.find(x => x.name === name);
if (!queue) {
const body = this.readFile(name);
if (body) {
queue = {
name: name,
messages: []
};
this._queuePersistance.push(queue);
this.addToTop('queues', name);
}
}
return queue;
}
public pushMessageToQueue(name: string, message: string) {
const queue = this.getQueueByName(name);
if (queue) {
this.addToTop(name, message);
queue.messages.push(message);
console.log(queue.messages);
}
}
public getMessageFromQueue(name: string) {
const queue = this.getQueueByName(name);
if (queue) {
const message = queue.messages[0];
const stat = fs.statSync(name);
fs.truncateSync(name, stat.size - message.length - 2);
const response = queue.messages.shift();
console.log(`${response} was requested and removed`);
return response;
}
}
private createQueueDefinitionArray() {
console.log('...loading queue definition');
const body = this.readFile('queues');
if (body) {
this._queueDefinitions = body.toString('utf8').split('\r\n');
console.log('...loading queue definition complete');
} else {
console.log('...loading queue definition failed');
process.exit(2);
}
}
private createQueuePersistance() {
console.log('...loading queue persistance');
if (this._queueDefinitions.length > 0) {
this._queueDefinitions.forEach((def) => {
const body = this.readFile(def);
if (body) {
this._queuePersistance.push({
name: def,
messages: body.toString('utf8').split('\r\n').reverse()
});
} else {
console.log('...loading queue persistance failed');
process.exit(2);
}
});
}
console.log('...loading queue persistance complete');
}
private readFile(filename: string): Buffer | undefined {
if (!fs.existsSync(filename)) {
fs.writeFile(filename, '', (error) => {
if (error) {
console.log(error);
}
});
return Buffer.from('');
}
try {
return fs.readFileSync(filename);
} catch (error) {
console.log(error);
}
}
private addToTop(filename: string, message: string) {
const fd = fs.openSync(filename, 'r+');
const data = fs.readFileSync(filename);
const buffer: Buffer = Buffer.from(`${message}\r\n`);
fs.writeSync(fd, buffer, 0, buffer.length, 0);
fs.writeSync(fd, data, 0, data.length, buffer.length);
fs.closeSync(fd);
}
}
export interface QueueStructure {
name: string;
messages: string[];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment