Skip to content

Instantly share code, notes, and snippets.

View rmoff's full-sized avatar

Robin Moffatt rmoff

View GitHub Profile
@rmoff
rmoff / decoding-json-with-jq.adoc
Created July 5, 2018 15:11
How to view a schema's JSON representation from the Confluent Schema Registry (Decoding encoded JSON with jq)

Confluent Schema Registry supports REST to pull down schema definitions:

$ curl -s localhost:8081/subjects/asgard.demo.CUSTOMERS-raw-value/versions/latest

{"subject":"asgard.demo.CUSTOMERS-raw-value","version":1,"id":6,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"asgard.demo.CUSTOMERS\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"first_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"last_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"club_status\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"comments\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"create_ts\",\"type\":{\"type\":\"string\",\"connect.version\":1,\"connect.default\":\"1970-01-01T00:00:00

Keybase proof

I hereby claim:

  • I am rmoff on github.
  • I am rmoff (https://keybase.io/rmoff) on keybase.
  • I have a public key ASB2fZAiJ83jDPiOrWiKXYzkweNEpmaZx9U1hsiJxtpGEgo

To claim this, I am signing this object:

@rmoff
rmoff / gist:59240f26dc557c16f4a36474ed10849c
Created June 18, 2018 14:05
Oracle JSON dot-notation syntax
SQL> SET ECHO ON
SQL>
SQL> -- Create table to hold JSON
SQL> CREATE TABLE X (X CLOB CONSTRAINT ensure_X_json CHECK (X IS JSON));
Table X created.
SQL>
SQL> -- Load some JSON
SQL> INSERT INTO X VALUES ('{"X":1,"Y":2,"Z":{"A":1,"B":2}}');
@rmoff
rmoff / List all available Kafka Connect plugins.md
Created May 18, 2018 14:29
List all available Kafka Connect plugins
$ curl -s -XGET http://localhost:8083/connector-plugins|jq '.[].class'

"io.confluent.connect.activemq.ActiveMQSourceConnector"
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
"io.confluent.connect.hdfs.HdfsSinkConnector"
"io.confluent.connect.hdfs.tools.SchemaSourceConnector"
"io.confluent.connect.ibm.mq.IbmMQSourceConnector"
"io.confluent.connect.jdbc.JdbcSinkConnector"
"io.confluent.connect.jdbc.JdbcSourceConnector"

"io.confluent.connect.jms.JmsSourceConnector"

Installing the Cassandra Kafka Connect connector

$ git clone git@github.com:jcustenborder/kafka-connect-cassandra.git
$ cd kafka-connect-cassandra/
$ mvn clean package -DskipTests

Installing the MongoDB Connector in Kafka Connect

$ git clone https://github.com/hpgrahsl/kafka-connect-mongodb.git
$ cd kafka-connect-mongodb/
$ mvn clean package -DskipTests

Notes : Elasticsearch doc mapping woes

Without the index template, fields are sent through as text (and therefore not available for aggregate analysis). With the index template the connector fails with

{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [ratings_enriched] as the final mapping would have more than 1 type: [type.name=kafkaconnect, kafkaconnect]"},

Or:

org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"type.name=kafkaconnect":{"properties":{"RATING_ID":{"type":"long"},"CHANNEL":{"type":"text"},"MESSAGE":{"type":"text"},"ID":{"type":"integer"},"FULL_NAME":{"type":"text"},"EXTRACT_TS":{"type":"date"}}}} -- {"root_cause":[{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [ratings_enriched] as the final mapping would have more than 1 type: [foo, type.name=kafkaconnect]"}],"type":"illegal_argument_exception","reason":"Rejecting mapping update to [ratings_enriched] as the final mapping would have more than 1 type: [foo
from slackclient import SlackClient
token = 'xxxxxxxxxx'
sc = SlackClient(token)
sc.api_call('chat.postMessage', channel='general', text='Test!', username='Python client')
# rmoff / 05 Apr 2018
from confluent_kafka import Consumer, KafkaError
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic the first time that this runs.
#
# If you want to monitor your data streams through Confluent Control Center,
# make sure you've installed the interceptors and then uncomment
# the 'plugin.library.paths' config line
# Ref: https://docs.confluent.io/current/control-center/docs/installation/clients.html#installing-control-center-interceptors