Last active
March 8, 2016 17:28
-
-
Save kjvalencik/74ff9c7451878d150819 to your computer and use it in GitHub Desktop.
Kafka Partitioner Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
node_modules/ | |
npm-debug.log | |
.m2/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const assert = require("assert"); | |
const Bluebird = require("bluebird"); | |
const mvn = require("node-java-maven"); | |
const java = require("java"); | |
const murmur = require("murmur-hash-js"); | |
const mvnAsync = Bluebird.promisify(mvn); | |
const TOPIC_NAME = "test"; | |
const NUM_PARTITIONS = 120; | |
const NUM_TESTS = 1000; | |
function getDefaultPartitioner() { | |
const importPath = "org.apache.kafka.clients.producer.internals.DefaultPartitioner"; | |
const DefaultPartitioner = java.import(importPath); | |
return new DefaultPartitioner(); | |
} | |
function getNodes(n) { | |
const ArrayList = java.import("java.util.ArrayList"); | |
const Node = java.import("org.apache.kafka.common.Node"); | |
return Array(n) | |
.fill(0) | |
.map((_, i) => i) | |
.reduce((nodes, i) => { | |
nodes.addSync(new Node(i, "localhost", 8082)); | |
return nodes; | |
}, new ArrayList()); | |
} | |
function getPartitions(topicName, numPartitions, leader) { | |
const nodeImportPath = "org.apache.kafka.common.Node"; | |
const ArrayList = java.import("java.util.ArrayList"); | |
const PartitionInfo = java.import("org.apache.kafka.common.PartitionInfo"); | |
const replicas = java.newArray(nodeImportPath, []); | |
const inSyncReplicas = java.newArray(nodeImportPath, []); | |
return Array(numPartitions) | |
.fill(0) | |
.map((v, i) => i) | |
.reduce((partitions, i) => { | |
partitions.addSync(new PartitionInfo(topicName, i, leader, replicas, inSyncReplicas)); | |
return partitions; | |
}, new ArrayList()); | |
} | |
function getCluster(topicName, numPartitions) { | |
const ArrayList = java.import("java.util.ArrayList"); | |
const HashSet = java.import("java.util.HashSet"); | |
const Cluster = java.import("org.apache.kafka.common.Cluster"); | |
const nodes = getNodes(1); | |
const leader = nodes.getSync(0); | |
const partitions = getPartitions(topicName, numPartitions, leader); | |
const cluster = new Cluster(nodes, partitions, new HashSet()); | |
return cluster; | |
} | |
function toCharArray(s) { | |
const buf = Buffer.isBuffer(s) ? s : Buffer(s || ''); | |
return java.newArray("byte", Array.from(buf)); | |
} | |
function getRandomPartitions(topicName, numPartitions, numTests) { | |
const partitioner = getDefaultPartitioner(); | |
const cluster = getCluster(topicName, numPartitions); | |
function javaPartition(key) { | |
return Bluebird.fromCallback(cb => partitioner.partition( | |
topicName, | |
null, | |
toCharArray(key), | |
null, | |
toCharArray(), | |
cluster, | |
cb | |
)); | |
} | |
return Bluebird.resolve(Array(numTests)) | |
.call('fill', 0) | |
.map(() => { | |
const randomChunk = Math.random() | |
.toString(36) | |
.slice(2) | |
.slice(-(Math.random() * 16)); | |
return `Test Message: \u263a ${randomChunk}` | |
}) | |
.map(key => ({ key, javaPartition : javaPartition(key) })) | |
.map(Bluebird.props); | |
} | |
function nodePartition(key, numPartitions) { | |
return (murmur.murmur2(Buffer(key), 0x9747b28c) & 0x7fffffff) % numPartitions; | |
} | |
// Load maven libs to classpath | |
mvnAsync({ localRepository : "./m2/repository" }) | |
.get("classpath") | |
.each(lib => java.classpath.push(lib)) | |
.then(() => getRandomPartitions(TOPIC_NAME, NUM_PARTITIONS, NUM_TESTS)) | |
.map(o => Object.assign(o, { | |
nodePartition : nodePartition(o.key, NUM_PARTITIONS) | |
})) | |
.map(o => assert.strictEqual(o.javaPartition, o.nodePartition)) | |
.then(() => console.log('\n\033[32m\u2713 All tests passed!\n')) | |
.catch(err => { | |
console.log('\n\033[31m \u2715 Tests failed:\n'); | |
console.error(err.stack || err); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "no-kafka-murmur-test", | |
"version": "1.0.0", | |
"description": "Test murmur2 libraries for no-kafka", | |
"main": "index.js", | |
"scripts": { | |
"test": "node index.js", | |
"postinstall": "HOME=./ ./node_modules/.bin/node-java-maven" | |
}, | |
"author": "K.J. Valencik", | |
"license": "MIT", | |
"dependencies": { | |
"bluebird": "^3.3.4", | |
"java": "^0.6.1", | |
"murmur-hash-js": "^1.0.0", | |
"node-java-maven": "0.0.12" | |
}, | |
"java": { | |
"dependencies": [ | |
{ | |
"groupId": "org.apache.kafka", | |
"artifactId": "kafka-clients", | |
"version": "0.9.0.1" | |
} | |
] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment