Skip to content

Instantly share code, notes, and snippets.

@abacaphiliac
Last active January 18, 2017 23:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abacaphiliac/a3e8fe94152fc1d27368c54dc2f431b4 to your computer and use it in GitHub Desktop.
Save abacaphiliac/a3e8fe94152fc1d27368c54dc2f431b4 to your computer and use it in GitHub Desktop.
AWS Aurora to Maxwell Kafka Producer

Prerequisites

  1. AWS Aurora to Maxwell STDOUT Producer
  2. Run Kafka Container

After running through the prerequisites, you will have:

  • an AWS Aurora instance
  • a Maxwell image named osheroff/maxwell
  • a Kafka service named kafka, listening on kafka:9092

Start Maxwell with Kafka Producer

docker run -it --rm \
    --env MYSQL_USERNAME=AURORA_USERNAME \
    --env MYSQL_PASSWORD=AURORA_PASSWORD \
    --env MYSQL_HOST=AURORA_HOST \
    --link kafka \
    --env KAFKA_HOST=kafka \
    --env KAFKA_PORT=9092 \
    --name maxwell \
    osheroff/maxwell

output:

22:32:54,156 INFO  ProducerConfig - ProducerConfig values: 
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	buffer.memory = 33554432
	ssl.truststore.password = null
	batch.size = 16384
	ssl.keymanager.algorithm = SunX509
	receive.buffer.bytes = 32768
	ssl.cipher.suites = null
	ssl.key.password = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	ssl.provider = null
	sasl.kerberos.service.name = null
	max.in.flight.requests.per.connection = 5
	sasl.kerberos.ticket.renew.window.factor = 0.8
	bootstrap.servers = [kafka:9092]
	client.id = 
	max.request.size = 1048576
	acks = 1
	linger.ms = 0
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	metadata.fetch.timeout.ms = 60000
	ssl.endpoint.identification.algorithm = null
	ssl.keystore.location = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
	ssl.truststore.location = null
	ssl.keystore.password = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	block.on.buffer.full = false
	metrics.sample.window.ms = 30000
	metadata.max.age.ms = 300000
	security.protocol = PLAINTEXT
	ssl.protocol = TLS
	sasl.kerberos.min.time.before.relogin = 60000
	timeout.ms = 30000
	connections.max.idle.ms = 540000
	ssl.trustmanager.algorithm = PKIX
	metric.reporters = []
	compression.type = none
	ssl.truststore.type = JKS
	max.block.ms = 60000
	retries = 0
	send.buffer.bytes = 131072
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	reconnect.backoff.ms = 50
	metrics.num.samples = 2
	ssl.keystore.type = JKS

22:32:54,212 INFO  AppInfoParser - Kafka version : 0.9.0.1
22:32:54,212 INFO  AppInfoParser - Kafka commitId : 23c69d62a0cabf06
22:32:54,277 INFO  Maxwell - Maxwell v1.7.0 is booting (MaxwellKafkaProducer), starting at BinlogPosition[mysql-bin-changelog.000002:56885]
22:32:54,952 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[mysql-bin-changelog.000002:3521])
22:32:58,518 INFO  OpenReplicator - starting replication at mysql-bin-changelog.000002:56885
22:33:23,282 WARN  NetworkClient - Error while fetching metadata with correlation id 0 : {maxwell=LEADER_NOT_AVAILABLE}

The process is now waiting for new data events.

Create a topic

docker exec kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic maxwell
docker exec kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181

output:

Created topic "maxwell".
test
maxwell

Start a consumer (in another terminal window)

This command will start an unnamed instance of spotify/kafka linked to the kafka service, start a consumer, display existing messages from the maxwell topic, and wait for new messages until you quit (which destroys the container):

docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic maxwell --from-beginning

Connect to the AWS Aurora instance, insert some records, and update some records. Data events from Maxwell will be printed in the Consumer terminal window:

{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606003,"xid":1655558,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Younger"},"old":{"first_name":"Timothy"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606435,"xid":1658343,"commit":true,"data":{"id":4,"first_name":"Timothy","last_name":"Younger"},"old":{"first_name":"Tim"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606451,"xid":1658455,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Younger"},"old":{"first_name":"Timothy"}}

Start Maxwell with Namespaced Topic Kafka Producer

Start Maxwell with Namespaced Topic Kafka Producer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment