Skip to content

Instantly share code, notes, and snippets.

@Ugbot
Created December 17, 2023 22:57
Show Gist options
  • Save Ugbot/93406f2de640531e8fdee15a453a9170 to your computer and use it in GitHub Desktop.
Save Ugbot/93406f2de640531e8fdee15a453a9170 to your computer and use it in GitHub Desktop.
Transactions in Kafka JS
const { Kafka } = require('kafkajs');
const axios = require('axios');
const fs = require('fs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['BROKERS'], // Broker list
ssl: {
rejectUnauthorized: true,
ca: [fs.readFileSync('ca.pem', 'utf-8')], // Load your CA certificate
},
sasl: {
mechanism: 'scram-sha-256', // Replace with your SASL mechanism
username: 'avnadmin',
password: 'PASSWORD'
}
});
const consumer = kafka.consumer({ groupId: 'my-topic-name + workerTYPE', allowAutoCommit: false });
const producer = kafka.producer({
transactionalId: 'my-topic + workerTYPE + workerID',
maxInFlightRequests: 1,
idempotent: true,
transactionTimeout: 60000,
});
const HTTP_ENDPOINT = 'https://echo.free.beeceptor.com'; // Replace with your HTTP endpoint
const MAX_CONCURRENT_REQUESTS = 5; // Define the maximum number of concurrent requests
const INPUT_TOPIC = 'js-input';
const OUTPUT_TOPIC = 'block-data';
const MAX_BYTES_PER_PARTITION = 1024 * 256; // 256KB per partition
const MAX_BYTES = 1024 * 1024; // 1MB per request
const run = async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: INPUT_TOPIC, fromBeginning: true, maxBytesPerPartition: MAX_BYTES_PER_PARTITION, maxBytes: MAX_BYTES });
await consumer.run({
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
pause,
}) => {
//this is where we open the transaction and send the HTTP requests
//we will commit the offsets after the HTTP requests are successful
const transaction = await producer.transaction();
// We will store all the promises in an array,
// so we can wait for them to resolve before committing the transaction,
// and so we can rate limit the number of in-flight requests
const promises = [];
for (let message of batch.messages) {
if (!isRunning() || isStale()) break;
if (promises.length >= MAX_CONCURRENT_REQUESTS) {
await Promise.all(promises);
promises.length = 0; // Clear the array
}
const promise = axios.post( HTTP_ENDPOINT,{'message': message.value.toString()})
.then(async response => {
console.log('Request successful:', response.data);
// Produce a message for each successful HTTP request
await transaction.send({
topic: 'target-topic',
messages: [{ value: `Processed: ${message.value.toString()}` }],
});
resolveOffset(message.offset);
await heartbeat();
})
.catch(error => {
console.error('Request failed:', error.message)
// this is where we can dead-letter the message OR retry
// This is a simple example, so we will just log the error and continue
});
promises.push(promise);
}
if (promises.length > 0) {
await Promise.all(promises);
}
await transaction.commit();
console.log('Transaction committed');
},
});
};
run().catch(console.error);
const { Kafka } = require('kafkajs');
const axios = require('axios');
const rateLimit = require('axios-rate-limit');
const fs = require('fs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['BROKERS'], // Broker list
ssl: {
rejectUnauthorized: true,
ca: [fs.readFileSync('ca.pem', 'utf-8')], // Load your CA certificate
},
sasl: {
mechanism: 'scram-sha-256', // Replace with your SASL mechanism
username: 'avnadmin',
password: 'PASSWORD'
}
});
const consumer = kafka.consumer({ groupId: 'my-2', allowAutoCommit: false });
const producer = kafka.producer({
transactionalId: 'my-transactional-producer',
maxInFlightRequests: 1,
idempotent: true,
transactionTimeout: 60000,
});
const HTTP_ENDPOINT = 'https://echo.free.beeceptor.com'; // Replace with your HTTP endpoint
const http = rateLimit(axios.create(), { maxRequests: 10, perMilliseconds: 1000 });
const INPUT_TOPIC = 'js-input';
const OUTPUT_TOPIC = 'block-data';
const MAX_BYTES_PER_PARTITION = 1024 * 256; // 256KB per partition
const MAX_BYTES = 1024 * 1024; // 1MB per request
const run = async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: INPUT_TOPIC, fromBeginning: true, maxBytesPerPartition: MAX_BYTES_PER_PARTITION, maxBytes: MAX_BYTES });
await consumer.run({
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
pause,
}) => {
//this is where we open the transaction and send the HTTP requests
//we will commit the offsets after the HTTP requests are successful
const transaction = await producer.transaction();
// We will store all the promises in an array,
// so we can wait for them to resolve before committing the transaction,
// and so we can rate limit the number of in-flight requests
for (let message of batch.messages) {
if (!isRunning() || isStale()) break;
console.log('Processing message:', message.value.toString());
http.post( HTTP_ENDPOINT,{'message': message.value.toString()})
.then(async response => {
console.log('Request successful:', response.data);
// Produce a message for each successful HTTP request
await transaction.send({
topic: 'target-topic',
messages: [{ value: `Processed: ${message.value.toString()}` }],
});
resolveOffset(message.offset);
await heartbeat();
})
.catch(error => {
console.error('Request failed:', error.message)
// this is where we can dead-letter the message OR retry
// This is a simple example, so we will just log the error and continue
});
}
await transaction.commit();
console.log('Transaction committed');
},
});
};
run().catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment