Created
December 17, 2023 22:57
-
-
Save Ugbot/93406f2de640531e8fdee15a453a9170 to your computer and use it in GitHub Desktop.
Transactions in Kafka JS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Kafka } = require('kafkajs'); | |
const axios = require('axios'); | |
const fs = require('fs'); | |
const kafka = new Kafka({ | |
clientId: 'my-app', | |
brokers: ['BROKERS'], // Broker list | |
ssl: { | |
rejectUnauthorized: true, | |
ca: [fs.readFileSync('ca.pem', 'utf-8')], // Load your CA certificate | |
}, | |
sasl: { | |
mechanism: 'scram-sha-256', // Replace with your SASL mechanism | |
username: 'avnadmin', | |
password: 'PASSWORD' | |
} | |
}); | |
const consumer = kafka.consumer({ groupId: 'my-topic-name + workerTYPE', allowAutoCommit: false }); | |
const producer = kafka.producer({ | |
transactionalId: 'my-topic + workerTYPE + workerID', | |
maxInFlightRequests: 1, | |
idempotent: true, | |
transactionTimeout: 60000, | |
}); | |
const HTTP_ENDPOINT = 'https://echo.free.beeceptor.com'; // Replace with your HTTP endpoint | |
const MAX_CONCURRENT_REQUESTS = 5; // Define the maximum number of concurrent requests | |
const INPUT_TOPIC = 'js-input'; | |
const OUTPUT_TOPIC = 'block-data'; | |
const MAX_BYTES_PER_PARTITION = 1024 * 256; // 256KB per partition | |
const MAX_BYTES = 1024 * 1024; // 1MB per request | |
const run = async () => { | |
await producer.connect(); | |
await consumer.connect(); | |
await consumer.subscribe({ topic: INPUT_TOPIC, fromBeginning: true, maxBytesPerPartition: MAX_BYTES_PER_PARTITION, maxBytes: MAX_BYTES }); | |
await consumer.run({ | |
eachBatch: async ({ | |
batch, | |
resolveOffset, | |
heartbeat, | |
commitOffsetsIfNecessary, | |
uncommittedOffsets, | |
isRunning, | |
isStale, | |
pause, | |
}) => { | |
//this is where we open the transaction and send the HTTP requests | |
//we will commit the offsets after the HTTP requests are successful | |
const transaction = await producer.transaction(); | |
// We will store all the promises in an array, | |
// so we can wait for them to resolve before committing the transaction, | |
// and so we can rate limit the number of in-flight requests | |
const promises = []; | |
for (let message of batch.messages) { | |
if (!isRunning() || isStale()) break; | |
if (promises.length >= MAX_CONCURRENT_REQUESTS) { | |
await Promise.all(promises); | |
promises.length = 0; // Clear the array | |
} | |
const promise = axios.post( HTTP_ENDPOINT,{'message': message.value.toString()}) | |
.then(async response => { | |
console.log('Request successful:', response.data); | |
// Produce a message for each successful HTTP request | |
await transaction.send({ | |
topic: 'target-topic', | |
messages: [{ value: `Processed: ${message.value.toString()}` }], | |
}); | |
resolveOffset(message.offset); | |
await heartbeat(); | |
}) | |
.catch(error => { | |
console.error('Request failed:', error.message) | |
// this is where we can dead-letter the message OR retry | |
// This is a simple example, so we will just log the error and continue | |
}); | |
promises.push(promise); | |
} | |
if (promises.length > 0) { | |
await Promise.all(promises); | |
} | |
await transaction.commit(); | |
console.log('Transaction committed'); | |
}, | |
}); | |
}; | |
run().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Kafka } = require('kafkajs'); | |
const axios = require('axios'); | |
const rateLimit = require('axios-rate-limit'); | |
const fs = require('fs'); | |
const kafka = new Kafka({ | |
clientId: 'my-app', | |
brokers: ['BROKERS'], // Broker list | |
ssl: { | |
rejectUnauthorized: true, | |
ca: [fs.readFileSync('ca.pem', 'utf-8')], // Load your CA certificate | |
}, | |
sasl: { | |
mechanism: 'scram-sha-256', // Replace with your SASL mechanism | |
username: 'avnadmin', | |
password: 'PASSWORD' | |
} | |
}); | |
const consumer = kafka.consumer({ groupId: 'my-2', allowAutoCommit: false }); | |
const producer = kafka.producer({ | |
transactionalId: 'my-transactional-producer', | |
maxInFlightRequests: 1, | |
idempotent: true, | |
transactionTimeout: 60000, | |
}); | |
const HTTP_ENDPOINT = 'https://echo.free.beeceptor.com'; // Replace with your HTTP endpoint | |
const http = rateLimit(axios.create(), { maxRequests: 10, perMilliseconds: 1000 }); | |
const INPUT_TOPIC = 'js-input'; | |
const OUTPUT_TOPIC = 'block-data'; | |
const MAX_BYTES_PER_PARTITION = 1024 * 256; // 256KB per partition | |
const MAX_BYTES = 1024 * 1024; // 1MB per request | |
const run = async () => { | |
await producer.connect(); | |
await consumer.connect(); | |
await consumer.subscribe({ topic: INPUT_TOPIC, fromBeginning: true, maxBytesPerPartition: MAX_BYTES_PER_PARTITION, maxBytes: MAX_BYTES }); | |
await consumer.run({ | |
eachBatch: async ({ | |
batch, | |
resolveOffset, | |
heartbeat, | |
commitOffsetsIfNecessary, | |
uncommittedOffsets, | |
isRunning, | |
isStale, | |
pause, | |
}) => { | |
//this is where we open the transaction and send the HTTP requests | |
//we will commit the offsets after the HTTP requests are successful | |
const transaction = await producer.transaction(); | |
// We will store all the promises in an array, | |
// so we can wait for them to resolve before committing the transaction, | |
// and so we can rate limit the number of in-flight requests | |
for (let message of batch.messages) { | |
if (!isRunning() || isStale()) break; | |
console.log('Processing message:', message.value.toString()); | |
http.post( HTTP_ENDPOINT,{'message': message.value.toString()}) | |
.then(async response => { | |
console.log('Request successful:', response.data); | |
// Produce a message for each successful HTTP request | |
await transaction.send({ | |
topic: 'target-topic', | |
messages: [{ value: `Processed: ${message.value.toString()}` }], | |
}); | |
resolveOffset(message.offset); | |
await heartbeat(); | |
}) | |
.catch(error => { | |
console.error('Request failed:', error.message) | |
// this is where we can dead-letter the message OR retry | |
// This is a simple example, so we will just log the error and continue | |
}); | |
} | |
await transaction.commit(); | |
console.log('Transaction committed'); | |
}, | |
}); | |
}; | |
run().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment