Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rmoff/165b05e4554c41719b71f1a47ee7b113 to your computer and use it in GitHub Desktop.
Save rmoff/165b05e4554c41719b71f1a47ee7b113 to your computer and use it in GitHub Desktop.
Continuous Conversion of Kafka topics from JSON to Avro with KSQL

@rmoff / 04 Apr 2018

This is easy with KSQL :)

Here’s a dummy topic, in JSON:

$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}

In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):

ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Now create a derived stream, specifying the target serialisation (Avro) and the target topic (this is optional; without it will just take the name of the stream):

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Check out the resulting Avro topic:

$ kafka-avro-console-consumer \
                   --bootstrap-server localhost:9092 \
                   --property schema.registry.url=http://localhost:8081 \
                   --topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}

Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automagically converted to Avro on the derived topic.

@bobbizbp
Copy link

Mistake of mine, solved.
Any field declared as "key" in the source topic is shown as ROWKEY in ksql, and the original name is lost. (Why?)
So, writing "create stream ( item_id string, item_name string ... ) ..." I eventually created a new "item_id", obviously null.
Coming from decades on relational systems, I assumed that referencing a non-existent column would be flagged as an error.
It is not, lesson learned ;)

@nhoclove
Copy link

I saw the schema of data was already embedded in avro data. So why do we need to register the schema to SchemaRegistry ?

@OneCricketeer
Copy link

why do we need to register the schema to SchemaRegistry?

You don't. KSQL does that for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment