Created
January 18, 2012 22:24
-
-
Save joestein/1636208 to your computer and use it in GitHub Desktop.
kafka-240 read and write
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package kafka.api | |
import java.nio._ | |
import kafka.message._ | |
import kafka.network._ | |
import kafka.utils._ | |
class WiredTopic(val topic: String, val partition_data: Array[WiredPartition]) | |
class WiredPartition(val partition: Int, | |
val messages: kafka.javaapi.message.ByteBufferMessageSet) | |
object ProducerRequest { | |
val RandomPartition = -1 | |
def readFrom(buffer: ByteBuffer): ProducerRequest = { | |
val size: Int = buffer.getInt | |
val request_type_id: Short = buffer.getShort | |
val version_id: Short = buffer.getShort | |
val correlation_id: Int = buffer.getInt | |
val client_id: String = Utils.readShortString(buffer, "UTF-8") | |
val required_acks: Short = buffer.getShort | |
val ack_timeout: Int = buffer.getInt | |
//build the topic structure | |
val topicCount = buffer.getInt | |
val data: Array[WiredTopic](topicCount) | |
for(i <- 0 until topicCount) { | |
val topic = Utils.readShortString(buffer, "UTF-8") | |
data.topic = topic | |
val partitionCount = buffer.getInt | |
//build the partition structure within this topic | |
data.partition_data = new Array[WiredPartition](partitionCount) | |
for (j <- 0 until partitionCount) { | |
data(i).partition_data(j).partition = buffer.getInt | |
val messageSetSize = buffer.getInt | |
val messageSetBuffer = new Array[Byte](messageSetSize) | |
buffer.get(messageSetBuffer,0,buffer.position) | |
messageSetBuffer.limit(messageSetSize) | |
data(i).partition_data(j).messages = new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)) | |
} | |
} | |
new ProducerRequest(size,request_type_id,version_id,correlation_id,client_id,required_acks,ack_timeout,data) | |
} | |
} | |
class ProducerRequest(val size: Int, | |
val request_type_id: Short, | |
val version_id: Short, | |
val correlation_id: Int, | |
val client_id: String, | |
val required_acks: Short, | |
val ack_timeout: Int, | |
val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { | |
def writeTo(buffer: ByteBuffer) { | |
buffer.putInt(size) | |
buffer.putShort(request_type_id) | |
buffer.putShort(version_id) | |
buffer.putInt(correlation_id) | |
Utils.writeShortString(buffer, client_id, "UTF-8") | |
buffer.putShort(required_acks) | |
buffer.putInt(ack_timeout) | |
//save the topic structure | |
buffer.putInt(data.size) //the number of topics | |
data.foreach(d =>{ | |
Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic | |
buffer.putInt(d.partition_data.size) //the number of partitions | |
d.partition_data.foreach(p => { | |
buffer.putInt(p.partition) | |
buffer.putInt(messages.serialized.limit) | |
buffer.put(messages.serialized) | |
messages.serialized.rewind | |
}) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment