Skip to content

Instantly share code, notes, and snippets.

@speeddragon
Created January 25, 2020 23:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save speeddragon/06102e8f3e1f2154dc3b69f072132cd1 to your computer and use it in GitHub Desktop.
Save speeddragon/06102e8f3e1f2154dc3b69f072132cd1 to your computer and use it in GitHub Desktop.
AugmentedMessageDeserializer
class AugmentedMessageDeserializer extends MessageKafkaDeserializer[GenericRecord] with LazyLogging {
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): GenericRecord = {
val genericRecord: Try[GenericRecord] = retrieveGenericRecord(record)
genericRecord match {
case Success(value) => value
case Failure(exception) =>
val key = new String(record.key(), UTF_8)
val value = new String(record.value(), UTF_8)
logger.error(
s"operation='deserialize', message='Error in kafka record', key=$key, value=$value",
exception
)
null
}
}
override def getProducedType: TypeInformation[GenericRecord] = TypeExtractor.getForClass(classOf[GenericRecord])
private def retrieveGenericRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): Try[GenericRecord] = Try {
val rawContent = new String(record.value(), UTF_8)
val rawJson: Json = parse(rawContent).getOrElse(throw new IllegalArgumentException(s"Invalid json on $rawContent"))
val timestamp = rawJson.hcursor
.downField("event")
.get[String](TIMESTAMP.name)
.toOption
.getOrElse(INVALID.name)
timestamp match {
case INVALID.name => throw new NoSuchElementException(s"Not found timestamp on $rawContent")
case _ =>
val genericRecord = new GenericData.Record(GenericRecordSchema.schema)
genericRecord.put(KEY.name, new String(record.key(), UTF_8))
genericRecord.put(MESSAGE.name, record.value())
genericRecord.put(HEADERS.name, createJsonStringOfHeaders(record.headers()))
genericRecord.put(TIMESTAMP.name, timestamp)
genericRecord
}
}
private def createJsonStringOfHeaders(headers: Headers): String =
headers.toArray
.map(
header => header.key() -> header.value()
)
.toMap
.asJson
.toString
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment