Skip to content

Instantly share code, notes, and snippets.

@ppearcy
Last active August 29, 2015 14:25
Show Gist options
  • Save ppearcy/d6aee9be77cd9aa362a3 to your computer and use it in GitHub Desktop.
Save ppearcy/d6aee9be77cd9aa362a3 to your computer and use it in GitHub Desktop.
package blackboard.cloud.telemetry.secor
import com.pinterest.secor.common.SecorConfig
import com.pinterest.secor.message.Message
import com.pinterest.secor.parser.MessageParser
import io.confluent.kafka.serializers.KafkaAvroDecoder
import org.apache.avro.generic.GenericRecord
import org.joda.time.{DateTimeZone, DateTime}
import org.joda.time.format.DateTimeFormat
import org.slf4j.LoggerFactory
/**
* Returns partions based on date/version of an avro encoded payload. Schema registry is used to decode
*/
class AvroDateParser(config: SecorConfig) extends MessageParser(config) {
val logger = LoggerFactory.getLogger(classOf[AvroDateParser])
val defaultDate = new DateTime(0)
val defaultVersion = 0
val dayFormat = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC)
val timeFieldPath = mConfig.getMessageTimestampName.split("\\.")
if (timeFieldPath.length == 0) {
throw new Exception(s"Invalid message.timestamp.name provided: ${mConfig.getMessageTimestampName}")
}
def extractTime(path: Array[String], record: AnyRef): Long = {
record match {
case r: GenericRecord =>
if (path.size == 1) {
r.get(path.head).asInstanceOf[Long]
} else {
extractTime(path.tail, r.get(path.head))
}
case _ => throw new Exception(s"Unexpected type ($record) extracting time field from path: $timeFieldPath")
}
}
def getSchemaId(path: Array[String], record: AnyRef): Long = {
record match {
case r: GenericRecord =>
if (path.size == 1) {
r.get(path.head).asInstanceOf[Long]
} else {
extractTime(path.tail, r.get(path.head))
}
case _ => throw new Exception(s"Unexpected type ($record) extracting time field from path: $timeFieldPath")
}
}
override def extractPartitions(message: Message): Array[String] = {
if (!schemaRegistry.isPresent) {
throw new InstantiationException("Avro schema registry is required for the AvroFileWriter")
}
val registry = schemaRegistry.get
val avroDecoder = new KafkaAvroDecoder(registry)
val (date, version) = try {
val record = avroDecoder.fromBytes(message.getPayload).asInstanceOf[GenericRecord]
val millisSinceEpoch = extractTime(timeFieldPath, record)
val schemaVersion = registry.getVersion(message.getTopic + "-value", record.getSchema)
(new DateTime(millisSinceEpoch), schemaVersion)
} catch {
case e: Exception =>
val data = new String(message.getPayload)
logger.error(s"Topic: ${message.getTopic} - Unable to parse date field $timeFieldPath from $data", e)
(defaultDate, defaultVersion)
}
val datePartition = dayFormat.print(date)
Array(s"dt=$datePartition", s"version=$version")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment