Last active
September 10, 2018 06:24
-
-
Save andybryant/a90140772cae586a98e3d7975b379b26 to your computer and use it in GitHub Desktop.
KSQL UDF for reading and writing Kafka Connect formatted Dates (no time component)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.confluent.ksql.function.udf.datetime; | |
import io.confluent.common.utils.CopyOnWriteMap; | |
import io.confluent.ksql.function.udf.Udf; | |
import io.confluent.ksql.function.udf.UdfDescription; | |
import java.time.LocalDate; | |
import java.time.format.DateTimeFormatter; | |
import java.util.Map; | |
@UdfDescription(name = "datetostring", author = "Confluent", | |
description = "Converts a Kafka Connect encoded Date to a string using the given format pattern." | |
+ " The date must be stored as an integer and represents days since epoch." | |
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") | |
public class DateToString { | |
private Map<String, DateTimeFormatter> formatters = new CopyOnWriteMap<>(); | |
@Udf(description = "Converts a Kafka Connect encoded Date to a string using the given format pattern." | |
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") | |
public String dateToString(final Integer daysSinceEpoch, final String formatPattern) { | |
DateTimeFormatter formatter = formatters.computeIfAbsent(formatPattern, DateTimeFormatter::ofPattern); | |
return LocalDate.ofEpochDay(daysSinceEpoch).format(formatter); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.confluent.ksql.function.udf.datetime; | |
import io.confluent.common.utils.CopyOnWriteMap; | |
import io.confluent.ksql.function.udf.Udf; | |
import io.confluent.ksql.function.udf.UdfDescription; | |
import java.time.LocalDate; | |
import java.time.format.DateTimeFormatter; | |
import java.util.Map; | |
@UdfDescription(name = "stringtodate", author = "Confluent", | |
description = "Converts a string representation of a date into a Kafka Connect encoded Date using the given format pattern." | |
+ " The date will be returned as integer and represents days since epoch." | |
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") | |
public class StringToDate { | |
private Map<String, DateTimeFormatter> formatters = new CopyOnWriteMap<>(); | |
@Udf(description = "Converts a string representation of a date into a Kafka Connect encoded Date using the given format pattern." | |
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") | |
public int stringToDate(final String formattedDate, final String formatPattern) { | |
DateTimeFormatter formatter = formatters.computeIfAbsent(formatPattern, DateTimeFormatter::ofPattern); | |
return ((int)LocalDate.parse(formattedDate, formatter).toEpochDay()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment