Skip to content

Instantly share code, notes, and snippets.

@andrew-rosca
Created April 26, 2024 22:22
Show Gist options
  • Save andrew-rosca/0e264cb14be869b1f5cf8cb30aa0c334 to your computer and use it in GitHub Desktop.
Save andrew-rosca/0e264cb14be869b1f5cf8cb30aa0c334 to your computer and use it in GitHub Desktop.
How to make ksqlDB processing log messages easier to read and interpret

Sometimes ksqlDB queries run against Kafka topics return no results, or fewer messages than expected. This can happen because ksqlDB is unable to deserialize messages successfully.

ksqlDB writes deserialization error messages to a dedicated topic, and exposes stream KSQL_PROCESSING_LOG to make them queriable. However, the messages from this topic are not very easy to interpret, among other things because the original record that failed deserialization is base64-encoded.

SELECT * FROM KSQL_PROCESSING_LOG LIMIT 1;
{
  "LOGGER": "processing.transient_WIDGETS_8903487627771102614.KsqlTopic.Source.deserializer",
  "LEVEL": "ERROR",
  "TIME": 1714137911168,
  "MESSAGE": {
    "TYPE": 0,
    "DESERIALIZATIONERROR": {
      "TARGET": "value",
      "ERRORMESSAGE": "Error deserializing message from topic: widgets",
      "RECORDB64": "eyJpZCI6IkIwMDEiLCJuYW1lIjoiU2t5c2NyYXBlciBPbmUiLCJ1bml0cyI6W3siaWQiOiJVMTAxIiwibmFtZSI6IlVuaXQgMTAxIiwiaXNPY2N1cGllZCI6dHJ1ZX0seyJpZCI6IlUxMDIiLCJuYW1lIjoiVW5pdCAxMDIiLCJpc09jY3VwaWVkIjp0cnVlfV19",
      "CAUSE": [
        "Failed to deserialize data for topic widgets to Protobuf: ",
        "Error deserializing Protobuf message for id -1",
        "Unknown magic byte!"
      ],
      "topic": "buildings"
    },
    "RECORDPROCESSINGERROR": null,
    "PRODUCTIONERROR": null,
    "SERIALIZATIONERROR": null,
    "KAFKASTREAMSTHREADERROR": null
  }
}

The following code creates a new derived stream, KSQL_ERRORS, which extracts the most relevant information from each log message, decodes the original record from base64, and also converts the timestamp to the local time zone, to make it easier to correlate errors with queries that were run.

DEFINE ksql_error_topic = '<insert topic name here>';
	-- the topic to which ksqlDB writes the error log
	-- get it by executing DESCRIBE KSQL_PROCESSING_LOG (look for "topic": "...")
DEFINE time_zone = 'America/Chicago';
	-- the time zone to which error message timestamps should be converted
	-- see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
DROP STREAM IF EXISTS KSQL_ERRORS;
CREATE STREAM KSQL_ERRORS 
WITH (KAFKA_TOPIC='${ksql_error_topic}') 
AS 
SELECT
	FORMAT_TIMESTAMP(FROM_UNIXTIME(KSQL_PROCESSING_LOG.TIME), 'yyyy-MM-dd HH:mm:ss.SSS', '${time_zone}') TIME_CT,  
	KSQL_PROCESSING_LOG.MESSAGE->DESERIALIZATIONERROR->ERRORMESSAGE ERRORMESSAGE,
	ENCODE(KSQL_PROCESSING_LOG.MESSAGE->DESERIALIZATIONERROR->RECORDB64, 'base64', 'utf8') MSG,
	KSQL_PROCESSING_LOG.MESSAGE->DESERIALIZATIONERROR->CAUSE CAUSE,   
	*
FROM KSQL_PROCESSING_LOG;

Querying this stream returns results that are more human-readable:

SELECT * FROM KSQL_ERRORS LIMIT 1;
{
  "TIME_CT": "2024-04-25 18:09:57.552",
  "ERRORMESSAGE": "Failed to deserialize value from topic: widgets. 
      Got unexpected JSON serialization format that did not start with the magic byte.
      If this stream was not serialized using the JsonSchemaConverter, then make sure 
      the stream is declared with JSON format (not JSON_SR).",
  "MSG": "{\"CONTRACT_ID\":\"b54f4844-fb1e-5e41-a33a-fc6aaedaf61b\",
      \"PAYMENT\":{\"ID\":\"527954842\",\"TYPE\":\"payments\",
      \"ATTRIBUTES\":{\"AMOUNT\":{\"AMOUNT\":9911.00,\"CURRENCY\":\"USD\"}}}}",
  "CAUSE": [
    "Got unexpected JSON serialization format that did not start with the magic byte.
    If this stream was not serialized using the JsonSchemaConverter, 
    then make sure the stream is declared with JSON format (not JSON_SR)."
  ],
  ...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment