Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abacaphiliac/874db6a0ab2384931ea9d8408e0e07cf to your computer and use it in GitHub Desktop.
Save abacaphiliac/874db6a0ab2384931ea9d8408e0e07cf to your computer and use it in GitHub Desktop.
AWS Aurora to Maxwell Namespaced Topic Kafka Producer

Prerequisites

  1. AWS Aurora to Maxwell Kafka Producer

After running through the prerequisites, you will have:

  • an AWS Aurora instance
  • a Maxwell image named osheroff/maxwell
  • a Kafka service named kafka, listening on kafka:9092

Start Maxwell with Namespaced Topic Kafka Producer

This is a slight variation of the prerequisite, AWS Aurora to Maxwell Kafka Producer. In the prerequisite we ran Maxwell with the default Kafka Producer configuration which will Produce messages on the maxwell topic. In this example we are overriding the MAXWELL_OPTIONS environment variable and specifying a dynamic topic name, so that Maxwell will route messages from each table to topics by the same name, namespaced by database name.

docker run -it --rm \
    --env MYSQL_USERNAME=AURORA_USERNAME \
    --env MYSQL_PASSWORD=AURORA_PASSWORD \
    --env MYSQL_HOST=AURORA_HOST \
    --link kafka \
    --env KAFKA_HOST=kafka \
    --env KAFKA_PORT=9092 \
    --env MAXWELL_OPTIONS="--kafka_topic=maxwell_%{database}_%{table}"
    --name maxwell \
    osheroff/maxwell

output:

17:44:34,901 INFO  ProducerConfig - ProducerConfig values: 
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	buffer.memory = 33554432
	ssl.truststore.password = null
	batch.size = 16384
	ssl.keymanager.algorithm = SunX509
	receive.buffer.bytes = 32768
	ssl.cipher.suites = null
	ssl.key.password = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	ssl.provider = null
	sasl.kerberos.service.name = null
	max.in.flight.requests.per.connection = 5
	sasl.kerberos.ticket.renew.window.factor = 0.8
	bootstrap.servers = [kafka:9092]
	client.id = 
	max.request.size = 1048576
	acks = 1
	linger.ms = 0
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	metadata.fetch.timeout.ms = 60000
	ssl.endpoint.identification.algorithm = null
	ssl.keystore.location = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
	ssl.truststore.location = null
	ssl.keystore.password = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	block.on.buffer.full = false
	metrics.sample.window.ms = 30000
	metadata.max.age.ms = 300000
	security.protocol = PLAINTEXT
	ssl.protocol = TLS
	sasl.kerberos.min.time.before.relogin = 60000
	timeout.ms = 30000
	connections.max.idle.ms = 540000
	ssl.trustmanager.algorithm = PKIX
	metric.reporters = []
	compression.type = none
	ssl.truststore.type = JKS
	max.block.ms = 60000
	retries = 0
	send.buffer.bytes = 131072
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	reconnect.backoff.ms = 50
	metrics.num.samples = 2
	ssl.keystore.type = JKS

17:44:34,952 INFO  AppInfoParser - Kafka version : 0.9.0.1
17:44:34,952 INFO  AppInfoParser - Kafka commitId : 23c69d62a0cabf06
17:44:35,012 INFO  Maxwell - Maxwell v1.7.0 is booting (MaxwellKafkaProducer), starting at BinlogPosition[mysql-bin-changelog.000002:84337]
17:44:35,680 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[mysql-bin-changelog.000002:3521])
17:44:38,991 INFO  OpenReplicator - starting replication at mysql-bin-changelog.000002:84337

The process is now waiting for new data events.

Start a consumer (in another terminal window)

This command will start an unnamed instance of spotify/kafka linked to the kafka service, start a consumer, display existing messages from the maxwell topic, and wait for new messages until you quit (which destroys the container):

docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic maxwell_{AURORA_DATABASE}_{AURORA_TABLE} --from-beginning

Connect to the AWS Aurora instance, insert some records, and update some records. Data events from Maxwell will be printed in the Consumer terminal window:

{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606003,"xid":1655558,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Younger"},"old":{"first_name":"Timothy"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606435,"xid":1658343,"commit":true,"data":{"id":4,"first_name":"Timothy","last_name":"Younger"},"old":{"first_name":"Tim"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606451,"xid":1658455,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Younger"},"old":{"first_name":"Timothy"}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment