Skip to content

Instantly share code, notes, and snippets.

@barisbll
Last active March 8, 2023 20:36
Show Gist options
  • Save barisbll/3b22161674b25a561f3b4e4904ad3672 to your computer and use it in GitHub Desktop.
Save barisbll/3b22161674b25a561f3b4e4904ad3672 to your computer and use it in GitHub Desktop.
Typescript RPC with Observables Written By ChatGPT
import * as amqp from 'amqplib';
import { Observable, Observer } from 'rxjs';
interface RpcRequest {
correlationId: string;
resolve: (response: any) => void;
reject: (error: Error) => void;
}
class RabbitMQRpcClient {
private connection: amqp.Connection;
private channel: amqp.Channel;
private readonly exchangeName: string;
private readonly requestQueueName: string;
private readonly responseQueueName: string;
private readonly rpcRequests: Record<string, RpcRequest> = {};
constructor(private readonly serverUrls: string[]) {
this.exchangeName = 'rpc_exchange';
this.requestQueueName = 'rpc_request_queue';
this.responseQueueName = 'rpc_response_queue';
this.initialize();
}
private async initialize() {
this.connection = await amqp.connect(this.serverUrls);
this.channel = await this.connection.createChannel();
// Create the exchange and the queues
await this.channel.assertExchange(this.exchangeName, 'direct', { durable: false });
await this.channel.assertQueue(this.requestQueueName, { exclusive: true });
await this.channel.assertQueue(this.responseQueueName, { exclusive: true });
// Bind the queues to the exchange
await this.channel.bindQueue(this.requestQueueName, this.exchangeName, this.requestQueueName);
await this.channel.bindQueue(this.responseQueueName, this.exchangeName, this.responseQueueName);
// Consume messages from the response queue
await this.channel.consume(this.responseQueueName, (message) => {
const correlationId = message.properties.correlationId;
const rpcRequest = this.rpcRequests[correlationId];
if (rpcRequest) {
const response = JSON.parse(message.content.toString());
rpcRequest.resolve(response);
delete this.rpcRequests[correlationId];
}
}, { noAck: true });
}
public callRpcServer<TRequest, TResponse>(serviceName: string, methodName: string, request: TRequest): Observable<TResponse> {
return new Observable<TResponse>((observer: Observer<TResponse>) => {
const correlationId = this.generateUuid();
const responseQueue = this.responseQueueName;
const rpcRequest = { correlationId, resolve: observer.next.bind(observer), reject: observer.error.bind(observer) };
this.rpcRequests[correlationId] = rpcRequest;
// Send the request to the request queue
const requestData = { serviceName, methodName, request };
const requestDataJson = JSON.stringify(requestData);
this.channel.publish(this.exchangeName, this.requestQueueName, Buffer.from(requestDataJson), { correlationId, replyTo: responseQueue });
});
}
private generateUuid(): string {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
}
const rpcClient = new RabbitMQRpcClient(['amqp://localhost']);
rpcClient.callRpcServer<number, number>('calculator', 'add', { x: 2, y: 3 }).subscribe((result) => {
console.log(`Result: ${result}`);
}, (error) => {
console.error(`Error: ${error}`);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment