Skip to content

Instantly share code, notes, and snippets.

@joestein
Created January 11, 2012 03:41
Show Gist options
  • Save joestein/1592895 to your computer and use it in GitHub Desktop.
Save joestein/1592895 to your computer and use it in GitHub Desktop.
thoughts on kafka-240
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi
import kafka.network.Request
import kafka.api.RequestKeys
import java.nio.ByteBuffer
class WiredTopic(val topic: String, val partition_data: List[WiredPartition])
class WiredPartition(val partition: Int,
val messages: kafka.javaapi.message.ByteBufferMessageSet)
object ProducerRequest {
def apply(topic: String, partition: Int, messages: kafka.javaapi.message.ByteBufferMessageSet) = {
var producerRequest = new ProducerRequest(0,0,0,0,"",0,0,List(new WiredTopic(topic,List(new WiredPartition(partition,messages)))))
producerRequest.setLegacyWire(topic, partition, messages)
producerRequest
}
}
class ProducerRequest(val size: Int,
val request_type_id: Short,
val version_id: Short,
val correlation_id: Int,
val client_id: String,
val required_acks: Short,
val ack_timeout: Int,
val data: List[WiredTopic]) extends Request(RequestKeys.Produce) {
//legacy version = 0
var topic: String = ""
var partition: Int = 0
var messages: kafka.javaapi.message.ByteBufferMessageSet = null
def setLegacyWire(topic: String, partition: Int, messages: kafka.javaapi.message.ByteBufferMessageSet) = {
this.topic = topic
this.partition = partition
this.messages = messages
}
import Implicits._
private val underlying = {
//TODO: version = 0 then we do this
new kafka.api.ProducerRequest(topic, partition, messages)
//else WiredProducerRequest implementation of new class here
}
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
def sizeInBytes(): Int = underlying.sizeInBytes
def getTranslatedPartition(randomSelector: String => Int): Int =
underlying.getTranslatedPartition(randomSelector)
override def toString: String =
underlying.toString
override def equals(other: Any): Boolean = {
other match {
case that: ProducerRequest =>
//TODO: version = 0 then we do this
(that canEqual this) && topic == that.topic && partition == that.partition &&
messages.equals(that.messages)
//otherwise for version we check another way on equality
case _ => false
}
}
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment