Skip to content

Instantly share code, notes, and snippets.

@maciekciolek
Created July 26, 2016 13:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maciekciolek/d1f1a031be4fb1c7eeea645669550449 to your computer and use it in GitHub Desktop.
Save maciekciolek/d1f1a031be4fb1c7eeea645669550449 to your computer and use it in GitHub Desktop.
package vectos.kafka.akkaimpl.producer
import akka.Done
import akka.actor._
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import vectos.kafka.akkaimpl.KafkaConnection
import vectos.kafka.types.v0._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
object ConnectionCoordinator {
def props(servers: Seq[BrokerAddress]) = Props(new ConnectionCoordinator(servers))
}
final case class Produce(topic: String, partition: Int, records: Seq[KafkaRecord]) {
def toKafkaRequest(timeout: Int): KafkaRequest.Produce = {
import vectos.kafka.types.v0._
val messages = records.map(r => MessageSetEntry(0, Message(0, 0, r.key.toVector, r.value.toVector))).toVector
val topicRequest = ProduceTopicRequest(Some(topic), Vector(ProduceTopicPartitionRequest(partition, messages)))
KafkaRequest.Produce(1, timeout, Vector(topicRequest))
}
}
class ConnectionCoordinator(bootstrapServers: Seq[BrokerAddress]) extends Actor with ActorLogging with Stash {
@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var brokers = Map.empty[Int, Broker]
@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var topics = Map.empty[String, TopicStatus]
private implicit val ec: ExecutionContext = context.dispatcher
private implicit val timeout: Timeout = Timeout(30.seconds) //TODO: This should be configureable
override def preStart(): Unit = {
val bootstraps = bootstrapServers.map(initializeBrokerConnection)
context.become(bootstraping(bootstraps))
}
private def bootstraping(connections: Seq[ActorRef]): Receive = {
connections.foreach(_ ! KafkaRequest.Metadata(Vector.empty))
{
case _: Produce => stash()
case Success(metadata: KafkaResponse.Metadata) =>
updateMetadata(metadata)
connections.foreach(connection => context.stop(connection)) //TODO: Maybe reuse this connections
unstashAll()
context.become(receive)
}
}
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
private def forwardToBroker(broker: Broker, produce: Produce): Unit =
broker.ref
.ask(produce.toKafkaRequest(30)) //TODO: This should be configureable
.mapTo[Try[KafkaResponse.Produce]]
.flatMap {
case Success(response) =>
val errors = response.topics.flatMap(_.partitions.map(_.errorCode).filter(_ != KafkaError.NoError))
if (errors.isEmpty) {
Future.successful(Done)
} else {
Future.failed(new IllegalStateException(s"Errors occurred $errors."))
}
case Failure(ex) => Future.failed(ex)
}
.pipeTo(sender())
override def receive: Receive = {
case p @ Produce(topic, partition, _) if topics.contains(topic) =>
getBrokerFor(topic, partition) match {
case Some(broker) =>
forwardToBroker(broker, p)
case None =>
sender() ! Status.Failure(new IllegalStateException(s"Broker does not exist for topic $topic, partition $partition."))
}
case Produce(topic, _, _) =>
stash()
context.become(fetchingMetadata(topic))
case Terminated(brokerRef) =>
markBrokerAsUnavailable(brokerRef)
}
private def markBrokerAsUnavailable(brokerRef: ActorRef): Unit = {
log.error(s"Broker unavailable $brokerRef")
//TODO: Handle this error
}
private def fetchingMetadata(topic: String): Receive = {
log.info(s"Fetching metadata for $topic.")
brokers.foreach {
case (_, broker) =>
broker.ref ! KafkaRequest.Metadata(Vector(Some(topic)))
}
{
case Success(metadata: KafkaResponse.Metadata) =>
updateMetadata(metadata)
unstashAll()
context.become(receive)
case Terminated(brokerRef) =>
markBrokerAsUnavailable(brokerRef)
case _ => stash()
}
}
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
private def updateMetadata(metadata: KafkaResponse.Metadata) = {
val currentBrokersIds = brokers.keys.toSet
val newBrokers = metadata.brokers.filterNot(b => currentBrokersIds.contains(b.nodeId))
brokers = brokers ++ newBrokers.map {
case m =>
//TODO: MetadataBrokerResponse`s host should not be an option
val host = m.host.getOrElse(throw new IllegalStateException("Cannot resolve host"))
val address = BrokerAddress(host, m.port)
val connection = initializeBrokerConnection(address)
context.watch(connection)
m.nodeId -> Broker(m.nodeId, address, connection)
}.toMap
topics = topics ++ metadata.topicMetadata
.map(t => {
//TODO: MetadataTopicMetadataResponse`s topic should not be an option
val name = t.name.getOrElse(throw new IllegalStateException("Cannot resolve topic name"))
name -> TopicStatus(t.partitionMetaData.map(p => p.id -> p.leader).toMap)
})
.filter { case (_, status) => status.partitionToBroker.nonEmpty }
.toMap
}
private def initializeBrokerConnection(address: BrokerAddress) = {
val props = KafkaConnection.props(KafkaConnection.Settings(address.host, address.port, 100))
context.actorOf(props)
}
private def getBrokerFor(topic: String, partition: Int): Option[Broker] =
for {
t <- topics.get(topic)
p <- t.partitionToBroker.get(partition)
broker <- brokers.get(p)
} yield broker
override def unhandled(message: Any): Unit = {
super.unhandled(message)
log.error(s"Unhandled message $message.")
}
private case class Broker(id: Int, address: BrokerAddress, ref: ActorRef)
private case class TopicStatus(partitionToBroker: Map[Int, Int])
}
package vectos.kafka.akkaimpl.producer
import akka.Done
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
final case class KafkaRecord(key: Array[Byte], value: Array[Byte])
object KafkaRecord {
@SuppressWarnings(Array("org.wartremover.warts.Overloading"))
def apply(key: String, value: String): KafkaRecord =
KafkaRecord(key.getBytes("UTF-8"), value.getBytes("UTF-8"))
}
final case class ProducerSettings(bootstrapServer: Seq[BrokerAddress], timeout: Duration)
final case class BrokerAddress(host: String, port: Int)
class KafkaProducer(settings: ProducerSettings)(implicit system: ActorSystem) {
private val coordinator = system.actorOf(ConnectionCoordinator.props(settings.bootstrapServer))
//TODO: Decide if timeout of ask should be the same as request timeout
private implicit val timeout: Timeout = Timeout(30.seconds)
private implicit val ec: ExecutionContext = system.dispatcher
def send(topic: String, partition: Int, record: KafkaRecord): Future[Done] = ???
def sendBatch(topic: String, partition: Int, records: Vector[KafkaRecord]): Future[Done] = {
coordinator.ask(Produce(topic, partition, records)).mapTo[Done]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment