Skip to content

Instantly share code, notes, and snippets.

@gbersac
Last active August 7, 2018 09:22
Show Gist options
  • Save gbersac/d258c39c957af2293e2e181c94129cfa to your computer and use it in GitHub Desktop.
Save gbersac/d258c39c957af2293e2e181c94129cfa to your computer and use it in GitHub Desktop.
#akka #kafka consumer deserialization error
/* for ammonite users
interp.load.ivy("com.typesafe.akka" %% "akka-stream-kafka" % "0.22")
interp.load.ivy("com.typesafe.play" %% "play-json" % "2.6.7")
@
*/
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{ Failure, Success }
import scala.util.Try
import akka.actor.{ ActorSystem, PoisonPill }
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.{ ActorMaterializer, Graph }
import play.api.libs.json.{Json, JsValue}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ Deserializer, Serializer, StringDeserializer, StringSerializer }
val kafkaHost = "localhost:9092"
val groupId = "test"
val topic = "test"
val kafkaAutoOffsetReset = "earliest"
implicit val actorSystem = ActorSystem("test")
implicit val materializer = ActorMaterializer()
def deserializer: Deserializer[Either[Throwable, JsValue]] =
new Deserializer[Either[Throwable, JsValue]] {
val stringDeserializer = new StringDeserializer
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit =
stringDeserializer.configure(configs, isKey)
override def deserialize(topic: String, data: Array[Byte]): Either[Throwable, JsValue] =
Try(Json.parse(stringDeserializer.deserialize(topic, data))).toEither
override def close(): Unit =
stringDeserializer.close()
}
val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, deserializer)
.withBootstrapServers(kafkaHost)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaAutoOffsetReset)
val terminate = () => actorSystem.terminate()
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
message.record.value() match {
case Right(json) => println(s"@@@@ ${Json.prettyPrint(json)}")
case Left(err) => println(s"@@@@ ${err.getMessage}")
}
}
.recoverWithRetries(
1,
PartialFunction[Throwable, Graph[akka.stream.SourceShape[Either[Throwable, JsValue]], akka.NotUsed]] { err =>
println(s"err : ${err.getMessage}")
Source(List(Left[Throwable, JsValue](err)))
}
)
.runWith(Sink.ignore)
.onComplete {
case Failure(_) => terminate()
case s @ Success(_) => terminate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment