Skip to content

Instantly share code, notes, and snippets.

@AcidLeroy
Last active November 12, 2020 14:01
Show Gist options
  • Save AcidLeroy/fc45b8a44fe41f97e3a6bc0b0543f679 to your computer and use it in GitHub Desktop.
Save AcidLeroy/fc45b8a44fe41f97e3a6bc0b0543f679 to your computer and use it in GitHub Desktop.
Server side event throughput client test for RIG.
const uuid = require('uuid');
var kafka = require('kafka-node')
const kafkaHost = process.env.KAFKA_HOST || "localhost:9092";
HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.KafkaClient({
kafkaHost: kafkaHost
});
console.log(`Publishing to kafka host "${kafkaHost}"`);
let payload = Buffer.alloc(1e5, 'z');
let producer = new HighLevelProducer(client);
let cloudEvent = {
"specversion": "0.2",
"type": "test-topic",
"source": "rig",
"id": uuid.v4(),
"data": payload.toString()
};
let payloads = [
{topic: 'test-topic', messages: [JSON.stringify(cloudEvent)]}
];
producer.on('ready', function() {
console.log("READY");
function send() {
producer.send(payloads, function (err, data) {
// console.log("Sent");
if (err) {
console.log('Received an error sending data to Kafka: ', err);
return;
}
send();
});
}
send()
})
producer.on('error', function(err) {
console.log("We got an error: ", err);
});
const kafka = require('kafka-node');
const Consumer = kafka.Consumer;
const Offset = kafka.Offset;
const kafkaHost = process.env.KAFKA_HOST || "localhost:9092"
let client = new kafka.KafkaClient({
kafkaHost: kafkaHost
});
var offset = new Offset(client);
console.log(`Subscribing to kafka at "${kafkaHost}"`);
const kafkaTopic='test-topic';
let consumer = new Consumer(
client,
[
{
topic: kafkaTopic
}
],
{
autoCommit: true
}
);
let start = null;
let totalData = 0;
consumer.on('message', function (message) {
//console.log('got a message');
if (start === null) {
start = Date.now();
// console.log("message = ", message);
}
//console.log(`last message length = ${message.value.length}`)
totalData += message.value.length;
let now = Date.now();
const totalMB = totalData/1e6;
const diffSeconds = (now-start)/1e3;
console.log(`Throughput = ${totalMB/diffSeconds} MB/s `)
});
consumer.on('error', function (err) {
console.log('got an error: ', err);
});
consumer.on('offsetOutOfRange', function (topic) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err);
}
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
});
const EventSource = require('eventsource');
const fetch = require('node-fetch');
const host = process.env.RIG_HOST || "http://localhost:4000"
const endpoint=`${host}/_rig/v1/connection/sse`
const kafkaTopic = "test-topic"
var source = new EventSource(endpoint)
source.onopen = (e) => console.log("open", e)
source.onmessage = (e) => console.log("message", e)
source.onerror = (e) => console.log("error", e)
let count = 0;
let totalBytes = 0;
let timeNow = 0;
source.addEventListener(kafkaTopic, e => {
console.log('Received data')
if (timeNow === 0) {
timeNow = Date.now();
}
const diff = Date.now() - timeNow;
console.log(`last message length = ${e.data.length}`)
totalBytes += e.data.length;
const sizeMB = totalBytes/1e6;
console.log(`Throughput = ${sizeMB/(diff/1e3)} MB/s`);
count++;
})
source.addEventListener("error", function (e) {
if (e.readyState == EventSource.CLOSED) {
console.log("Connection was closed.")
} else {
console.log("Connection error:", e)
}
}, false);
source.addEventListener("rig.connection.create", async function (e) {
let cloudEvent = JSON.parse(e.data)
let payload = cloudEvent.data
let connectionToken = payload["connection_token"]
console.log('cloudEvent ', cloudEvent);
console.log('connection Token = ', connectionToken);
let res = await createSubscription(connectionToken)
let text = await res.text()
console.log('DONE===> ', text)
}, false);
function createSubscription(connectionToken) {
const eventType = kafkaTopic;
const token = connectionToken;
const route=`${host}/_rig/v1/connection/sse/${token}/subscriptions`;
console.log('route = ', route);
return fetch(route, {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
"subscriptions": [{
"eventType": eventType
}]
})
})
}
const WebSocket = require('websocket').w3cwebsocket;
const fetch = require('node-fetch');
const host = process.env.RIG_HOST || "https://localhost:4000"
const wsHost = process.env.WS_RIG_HOST || "wss://localhost:4000"
const kafkaTopic = 'test-topic';
let count = 0;
let totalBytes = 0;
let timeNow = 0;
function addEvent(e) {
//console.log('Received data')
if (timeNow === 0) {
timeNow = Date.now();
}
const diff = Date.now() - timeNow;
console.log(`last message length = ${e.data.length}`)
totalBytes += e.data.length;
const sizeMB = totalBytes/1e6;
console.log(`Throughput = ${sizeMB/(diff/1e3)} MB/s`);
count++;
}
const baseUrl = `${host}/_rig/v1`
const wsUrl = `${wsHost}/_rig/v1`
function createSubscription(connectionToken) {
const eventType = kafkaTopic;
return fetch(`${baseUrl}/connection/ws/${connectionToken}/subscriptions`, {
method: "PUT",
mode: "cors",
headers: {
"Content-Type": "application/json; charset=utf-8"
},
body: JSON.stringify({ "subscriptions": [{ "eventType": eventType }] })
})
.then(json => {
console.log(`Subscription created to kafkaTopic "${kafkaTopic}"`)
return json
})
.catch(err => {
console.log("Failed to create subscription:", err)
})
}
const source = new WebSocket(`${wsUrl}/connection/ws`)
source.onopen = (e) => console.log("Connection opened")
source.onmessage = (e) => {
const cloudEvent = JSON.parse(e.data)
if (cloudEvent.type === 'rig.connection.create') {
payload = cloudEvent.data
connectionToken = payload["connection_token"]
createSubscription(connectionToken)
} else if (cloudEvent.type === kafkaTopic) {
addEvent(cloudEvent)
} else if (cloudEvent.type === 'rig.subscriptions_set') {
}
}
source.onerror = (e) => console.log("error", e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment