Sometimes ksqlDB queries run against Kafka topics return no results, or fewer messages than expected. This can happen because ksqlDB is unable to deserialize messages successfully.
ksqlDB writes deserialization error messages to a dedicated topic, and exposes stream KSQL_PROCESSING_LOG
to make them queriable. However, the messages from this topic are not very easy to interpret, among other things because the original record that failed deserialization is base64-encoded.
SELECT * FROM KSQL_PROCESSING_LOG LIMIT 1;
{
"LOGGER": "processing.transient_WIDGETS_8903487627771102614.KsqlTopic.Source.deserializer",
"LEVEL": "ERROR",
"TIME": 1714137911168,
"MESSAGE": {
"TYPE": 0,
"DESERIALIZATIONERROR": {
"TARGET": "value",
"ERRORMESSAGE": "Error deserializing message from topic: widgets",
"RECORDB64": "eyJpZCI6IkIwMDEiLCJuYW1lIjoiU2t5c2NyYXBlciBPbmUiLCJ1bml0cyI6W3siaWQiOiJVMTAxIiwibmFtZSI6IlVuaXQgMTAxIiwiaXNPY2N1cGllZCI6dHJ1ZX0seyJpZCI6IlUxMDIiLCJuYW1lIjoiVW5pdCAxMDIiLCJpc09jY3VwaWVkIjp0cnVlfV19",
"CAUSE": [
"Failed to deserialize data for topic widgets to Protobuf: ",
"Error deserializing Protobuf message for id -1",
"Unknown magic byte!"
],
"topic": "buildings"
},
"RECORDPROCESSINGERROR": null,
"PRODUCTIONERROR": null,
"SERIALIZATIONERROR": null,
"KAFKASTREAMSTHREADERROR": null
}
}
The following code creates a new derived stream, KSQL_ERRORS
, which extracts the most relevant information from each log message, decodes the original record from base64, and also converts the timestamp to the local time zone, to make it easier to correlate errors with queries that were run.
DEFINE ksql_error_topic = '<insert topic name here>';
-- the topic to which ksqlDB writes the error log
-- get it by executing DESCRIBE KSQL_PROCESSING_LOG (look for "topic": "...")
DEFINE time_zone = 'America/Chicago';
-- the time zone to which error message timestamps should be converted
-- see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
DROP STREAM IF EXISTS KSQL_ERRORS;
CREATE STREAM KSQL_ERRORS
WITH (KAFKA_TOPIC='${ksql_error_topic}')
AS
SELECT
FORMAT_TIMESTAMP(FROM_UNIXTIME(KSQL_PROCESSING_LOG.TIME), 'yyyy-MM-dd HH:mm:ss.SSS', '${time_zone}') TIME_CT,
KSQL_PROCESSING_LOG.MESSAGE->DESERIALIZATIONERROR->ERRORMESSAGE ERRORMESSAGE,
ENCODE(KSQL_PROCESSING_LOG.MESSAGE->DESERIALIZATIONERROR->RECORDB64, 'base64', 'utf8') MSG,
KSQL_PROCESSING_LOG.MESSAGE->DESERIALIZATIONERROR->CAUSE CAUSE,
*
FROM KSQL_PROCESSING_LOG;
Querying this stream returns results that are more human-readable:
SELECT * FROM KSQL_ERRORS LIMIT 1;
{
"TIME_CT": "2024-04-25 18:09:57.552",
"ERRORMESSAGE": "Failed to deserialize value from topic: widgets.
Got unexpected JSON serialization format that did not start with the magic byte.
If this stream was not serialized using the JsonSchemaConverter, then make sure
the stream is declared with JSON format (not JSON_SR).",
"MSG": "{\"CONTRACT_ID\":\"b54f4844-fb1e-5e41-a33a-fc6aaedaf61b\",
\"PAYMENT\":{\"ID\":\"527954842\",\"TYPE\":\"payments\",
\"ATTRIBUTES\":{\"AMOUNT\":{\"AMOUNT\":9911.00,\"CURRENCY\":\"USD\"}}}}",
"CAUSE": [
"Got unexpected JSON serialization format that did not start with the magic byte.
If this stream was not serialized using the JsonSchemaConverter,
then make sure the stream is declared with JSON format (not JSON_SR)."
],
...
}