Skip to content

Instantly share code, notes, and snippets.

@kjvalencik
Last active March 8, 2016 17:28
Show Gist options
  • Save kjvalencik/74ff9c7451878d150819 to your computer and use it in GitHub Desktop.
Save kjvalencik/74ff9c7451878d150819 to your computer and use it in GitHub Desktop.
Kafka Partitioner Test
node_modules/
npm-debug.log
.m2/
const assert = require("assert");
const Bluebird = require("bluebird");
const mvn = require("node-java-maven");
const java = require("java");
const murmur = require("murmur-hash-js");
const mvnAsync = Bluebird.promisify(mvn);
const TOPIC_NAME = "test";
const NUM_PARTITIONS = 120;
const NUM_TESTS = 1000;
function getDefaultPartitioner() {
const importPath = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
const DefaultPartitioner = java.import(importPath);
return new DefaultPartitioner();
}
function getNodes(n) {
const ArrayList = java.import("java.util.ArrayList");
const Node = java.import("org.apache.kafka.common.Node");
return Array(n)
.fill(0)
.map((_, i) => i)
.reduce((nodes, i) => {
nodes.addSync(new Node(i, "localhost", 8082));
return nodes;
}, new ArrayList());
}
function getPartitions(topicName, numPartitions, leader) {
const nodeImportPath = "org.apache.kafka.common.Node";
const ArrayList = java.import("java.util.ArrayList");
const PartitionInfo = java.import("org.apache.kafka.common.PartitionInfo");
const replicas = java.newArray(nodeImportPath, []);
const inSyncReplicas = java.newArray(nodeImportPath, []);
return Array(numPartitions)
.fill(0)
.map((v, i) => i)
.reduce((partitions, i) => {
partitions.addSync(new PartitionInfo(topicName, i, leader, replicas, inSyncReplicas));
return partitions;
}, new ArrayList());
}
function getCluster(topicName, numPartitions) {
const ArrayList = java.import("java.util.ArrayList");
const HashSet = java.import("java.util.HashSet");
const Cluster = java.import("org.apache.kafka.common.Cluster");
const nodes = getNodes(1);
const leader = nodes.getSync(0);
const partitions = getPartitions(topicName, numPartitions, leader);
const cluster = new Cluster(nodes, partitions, new HashSet());
return cluster;
}
function toCharArray(s) {
const buf = Buffer.isBuffer(s) ? s : Buffer(s || '');
return java.newArray("byte", Array.from(buf));
}
function getRandomPartitions(topicName, numPartitions, numTests) {
const partitioner = getDefaultPartitioner();
const cluster = getCluster(topicName, numPartitions);
function javaPartition(key) {
return Bluebird.fromCallback(cb => partitioner.partition(
topicName,
null,
toCharArray(key),
null,
toCharArray(),
cluster,
cb
));
}
return Bluebird.resolve(Array(numTests))
.call('fill', 0)
.map(() => {
const randomChunk = Math.random()
.toString(36)
.slice(2)
.slice(-(Math.random() * 16));
return `Test Message: \u263a ${randomChunk}`
})
.map(key => ({ key, javaPartition : javaPartition(key) }))
.map(Bluebird.props);
}
function nodePartition(key, numPartitions) {
return (murmur.murmur2(Buffer(key), 0x9747b28c) & 0x7fffffff) % numPartitions;
}
// Load maven libs to classpath
mvnAsync({ localRepository : "./m2/repository" })
.get("classpath")
.each(lib => java.classpath.push(lib))
.then(() => getRandomPartitions(TOPIC_NAME, NUM_PARTITIONS, NUM_TESTS))
.map(o => Object.assign(o, {
nodePartition : nodePartition(o.key, NUM_PARTITIONS)
}))
.map(o => assert.strictEqual(o.javaPartition, o.nodePartition))
.then(() => console.log('\n\033[32m\u2713 All tests passed!\n'))
.catch(err => {
console.log('\n\033[31m \u2715 Tests failed:\n');
console.error(err.stack || err);
});
{
"name": "no-kafka-murmur-test",
"version": "1.0.0",
"description": "Test murmur2 libraries for no-kafka",
"main": "index.js",
"scripts": {
"test": "node index.js",
"postinstall": "HOME=./ ./node_modules/.bin/node-java-maven"
},
"author": "K.J. Valencik",
"license": "MIT",
"dependencies": {
"bluebird": "^3.3.4",
"java": "^0.6.1",
"murmur-hash-js": "^1.0.0",
"node-java-maven": "0.0.12"
},
"java": {
"dependencies": [
{
"groupId": "org.apache.kafka",
"artifactId": "kafka-clients",
"version": "0.9.0.1"
}
]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment