Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save rmoff/165b05e4554c41719b71f1a47ee7b113 to your computer and use it in GitHub Desktop.
Save rmoff/165b05e4554c41719b71f1a47ee7b113 to your computer and use it in GitHub Desktop.
Continuous Conversion of Kafka topics from JSON to Avro with KSQL

@rmoff / 04 Apr 2018

This is easy with KSQL :)

Here’s a dummy topic, in JSON:

$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}

In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):

ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Now create a derived stream, specifying the target serialisation (Avro) and the target topic (this is optional; without it will just take the name of the stream):

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Check out the resulting Avro topic:

$ kafka-avro-console-consumer \
                   --bootstrap-server localhost:9092 \
                   --property schema.registry.url=http://localhost:8081 \
                   --topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}

Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automagically converted to Avro on the derived topic.

@stewartbryson
Copy link

Very nice Robin. And since Avro is being sent to topic target_avro, it will automatically register with the Schema Registry, correct? I see you are specifying the schema registry in your consumer, so I assume so.

@rmoff
Copy link
Author

rmoff commented Aug 6, 2018

Yes, generated Avro will have its schema stored in the Schema Registry

@stewartbryson
Copy link

Have you tested without the KAFKA_TOPIC='mysql_users_avro'? A developer on a project we are doing reported that this all works flawlessly as long as there is an explicit topic name.

@bobbizbp
Copy link

This helped me out of a dead end with a small prototype - well done, thank you!

I wasn't able to apply the same technique "as is" to a JSON topic with a key defined (please note, I'm new to Kafka).
I didn't expect the key field "ITEM_ID" to vanish from the topic, as viewed through the kafka-console-consumer (Confluent Platform OSS 5.0.0). On the other hand, in ksql "ITEM_ID" is a column of the corresponding stream, but constantly null, its value being assigned -as expected- to ROWKEY. Oddly enough, a "SELECT ROWTIME, ROWKEY, ROWKEY AS ITEM_ID, ..." returns what I had expected.

A mistake of mine, or is it "working as designed"?
This is not a dead end, but any help would be greatly appreciated (and thank you again).

@bobbizbp
Copy link

Mistake of mine, solved.
Any field declared as "key" in the source topic is shown as ROWKEY in ksql, and the original name is lost. (Why?)
So, writing "create stream ( item_id string, item_name string ... ) ..." I eventually created a new "item_id", obviously null.
Coming from decades on relational systems, I assumed that referencing a non-existent column would be flagged as an error.
It is not, lesson learned ;)

@nhoclove
Copy link

I saw the schema of data was already embedded in avro data. So why do we need to register the schema to SchemaRegistry ?

@OneCricketeer
Copy link

why do we need to register the schema to SchemaRegistry?

You don't. KSQL does that for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment