Skip to content

Instantly share code, notes, and snippets.

@joestein
Created January 18, 2012 22:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joestein/1636208 to your computer and use it in GitHub Desktop.
Save joestein/1636208 to your computer and use it in GitHub Desktop.
kafka-240 read and write
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio._
import kafka.message._
import kafka.network._
import kafka.utils._
class WiredTopic(val topic: String, val partition_data: Array[WiredPartition])
class WiredPartition(val partition: Int,
val messages: kafka.javaapi.message.ByteBufferMessageSet)
object ProducerRequest {
val RandomPartition = -1
def readFrom(buffer: ByteBuffer): ProducerRequest = {
val size: Int = buffer.getInt
val request_type_id: Short = buffer.getShort
val version_id: Short = buffer.getShort
val correlation_id: Int = buffer.getInt
val client_id: String = Utils.readShortString(buffer, "UTF-8")
val required_acks: Short = buffer.getShort
val ack_timeout: Int = buffer.getInt
//build the topic structure
val topicCount = buffer.getInt
val data: Array[WiredTopic](topicCount)
for(i <- 0 until topicCount) {
val topic = Utils.readShortString(buffer, "UTF-8")
data.topic = topic
val partitionCount = buffer.getInt
//build the partition structure within this topic
data.partition_data = new Array[WiredPartition](partitionCount)
for (j <- 0 until partitionCount) {
data(i).partition_data(j).partition = buffer.getInt
val messageSetSize = buffer.getInt
val messageSetBuffer = new Array[Byte](messageSetSize)
buffer.get(messageSetBuffer,0,buffer.position)
messageSetBuffer.limit(messageSetSize)
data(i).partition_data(j).messages = new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))
}
}
new ProducerRequest(size,request_type_id,version_id,correlation_id,client_id,required_acks,ack_timeout,data)
}
}
class ProducerRequest(val size: Int,
val request_type_id: Short,
val version_id: Short,
val correlation_id: Int,
val client_id: String,
val required_acks: Short,
val ack_timeout: Int,
val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) {
def writeTo(buffer: ByteBuffer) {
buffer.putInt(size)
buffer.putShort(request_type_id)
buffer.putShort(version_id)
buffer.putInt(correlation_id)
Utils.writeShortString(buffer, client_id, "UTF-8")
buffer.putShort(required_acks)
buffer.putInt(ack_timeout)
//save the topic structure
buffer.putInt(data.size) //the number of topics
data.foreach(d =>{
Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic
buffer.putInt(d.partition_data.size) //the number of partitions
d.partition_data.foreach(p => {
buffer.putInt(p.partition)
buffer.putInt(messages.serialized.limit)
buffer.put(messages.serialized)
messages.serialized.rewind
})
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment