Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Continuous Conversion of Kafka topics from JSON to Avro with KSQL

@rmoff / 04 Apr 2018

This is easy with KSQL :)

Here’s a dummy topic, in JSON:

$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}

In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):

ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Now create a derived stream, specifying the target serialisation (Avro) and the target topic (this is optional; without it will just take the name of the stream):

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Check out the resulting Avro topic:

$ kafka-avro-console-consumer \
                   --bootstrap-server localhost:9092 \
                   --property schema.registry.url=http://localhost:8081 \
                   --topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}

Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automagically converted to Avro on the derived topic.

@stewartbryson

This comment has been minimized.

Copy link

commented Aug 4, 2018

Very nice Robin. And since Avro is being sent to topic target_avro, it will automatically register with the Schema Registry, correct? I see you are specifying the schema registry in your consumer, so I assume so.

@rmoff

This comment has been minimized.

Copy link
Owner Author

commented Aug 6, 2018

Yes, generated Avro will have its schema stored in the Schema Registry

@stewartbryson

This comment has been minimized.

Copy link

commented Aug 15, 2018

Have you tested without the KAFKA_TOPIC='mysql_users_avro'? A developer on a project we are doing reported that this all works flawlessly as long as there is an explicit topic name.

@bobbizbp

This comment has been minimized.

Copy link

commented Aug 21, 2018

This helped me out of a dead end with a small prototype - well done, thank you!

I wasn't able to apply the same technique "as is" to a JSON topic with a key defined (please note, I'm new to Kafka).
I didn't expect the key field "ITEM_ID" to vanish from the topic, as viewed through the kafka-console-consumer (Confluent Platform OSS 5.0.0). On the other hand, in ksql "ITEM_ID" is a column of the corresponding stream, but constantly null, its value being assigned -as expected- to ROWKEY. Oddly enough, a "SELECT ROWTIME, ROWKEY, ROWKEY AS ITEM_ID, ..." returns what I had expected.

A mistake of mine, or is it "working as designed"?
This is not a dead end, but any help would be greatly appreciated (and thank you again).

@bobbizbp

This comment has been minimized.

Copy link

commented Aug 21, 2018

Mistake of mine, solved.
Any field declared as "key" in the source topic is shown as ROWKEY in ksql, and the original name is lost. (Why?)
So, writing "create stream ( item_id string, item_name string ... ) ..." I eventually created a new "item_id", obviously null.
Coming from decades on relational systems, I assumed that referencing a non-existent column would be flagged as an error.
It is not, lesson learned ;)

@nhoclove

This comment has been minimized.

Copy link

commented Jan 31, 2019

I saw the schema of data was already embedded in avro data. So why do we need to register the schema to SchemaRegistry ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.