Last active
August 29, 2015 14:25
-
-
Save ppearcy/d6aee9be77cd9aa362a3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package blackboard.cloud.telemetry.secor | |
import com.pinterest.secor.common.SecorConfig | |
import com.pinterest.secor.message.Message | |
import com.pinterest.secor.parser.MessageParser | |
import io.confluent.kafka.serializers.KafkaAvroDecoder | |
import org.apache.avro.generic.GenericRecord | |
import org.joda.time.{DateTimeZone, DateTime} | |
import org.joda.time.format.DateTimeFormat | |
import org.slf4j.LoggerFactory | |
/** | |
* Returns partions based on date/version of an avro encoded payload. Schema registry is used to decode | |
*/ | |
class AvroDateParser(config: SecorConfig) extends MessageParser(config) { | |
val logger = LoggerFactory.getLogger(classOf[AvroDateParser]) | |
val defaultDate = new DateTime(0) | |
val defaultVersion = 0 | |
val dayFormat = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC) | |
val timeFieldPath = mConfig.getMessageTimestampName.split("\\.") | |
if (timeFieldPath.length == 0) { | |
throw new Exception(s"Invalid message.timestamp.name provided: ${mConfig.getMessageTimestampName}") | |
} | |
def extractTime(path: Array[String], record: AnyRef): Long = { | |
record match { | |
case r: GenericRecord => | |
if (path.size == 1) { | |
r.get(path.head).asInstanceOf[Long] | |
} else { | |
extractTime(path.tail, r.get(path.head)) | |
} | |
case _ => throw new Exception(s"Unexpected type ($record) extracting time field from path: $timeFieldPath") | |
} | |
} | |
def getSchemaId(path: Array[String], record: AnyRef): Long = { | |
record match { | |
case r: GenericRecord => | |
if (path.size == 1) { | |
r.get(path.head).asInstanceOf[Long] | |
} else { | |
extractTime(path.tail, r.get(path.head)) | |
} | |
case _ => throw new Exception(s"Unexpected type ($record) extracting time field from path: $timeFieldPath") | |
} | |
} | |
override def extractPartitions(message: Message): Array[String] = { | |
if (!schemaRegistry.isPresent) { | |
throw new InstantiationException("Avro schema registry is required for the AvroFileWriter") | |
} | |
val registry = schemaRegistry.get | |
val avroDecoder = new KafkaAvroDecoder(registry) | |
val (date, version) = try { | |
val record = avroDecoder.fromBytes(message.getPayload).asInstanceOf[GenericRecord] | |
val millisSinceEpoch = extractTime(timeFieldPath, record) | |
val schemaVersion = registry.getVersion(message.getTopic + "-value", record.getSchema) | |
(new DateTime(millisSinceEpoch), schemaVersion) | |
} catch { | |
case e: Exception => | |
val data = new String(message.getPayload) | |
logger.error(s"Topic: ${message.getTopic} - Unable to parse date field $timeFieldPath from $data", e) | |
(defaultDate, defaultVersion) | |
} | |
val datePartition = dayFormat.print(date) | |
Array(s"dt=$datePartition", s"version=$version") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment