Skip to content

Instantly share code, notes, and snippets.

@dotmanila
Last active February 12, 2022 18:35
Show Gist options
  • Save dotmanila/53a2c0b9b7a3be4f9e99b94aa203c66d to your computer and use it in GitHub Desktop.
Save dotmanila/53a2c0b9b7a3be4f9e99b94aa203c66d to your computer and use it in GitHub Desktop.
Python MySQL Binlog Change Data Capture for Clickhouse Example
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import sys
import pymysql.cursors
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
)
LOG_DB_HOST = '<CHANGELOG MySQL HOST>'
LOG_DB_NAME = '<CHANGELOG MySQL DB>'
SRC_DB_HOST = '<SOURCE MySQL HOST>'
MYSQL_USER = '<MySQL USER>'
MYSQL_PASS = '<MySQL PASS>'
TABLE = 'hits'
MYSQL_SETTINGS = {
"host": SRC_DB_HOST,
"port": 3306,
"user": MYSQL_USER,
"passwd": MYSQL_PASS
}
def connect_log_db(host):
return pymysql.connect(
host=host,
port=3306,
user=MYSQL_USER,
passwd=MYSQL_PASS,
db=LOG_DB_NAME,
charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor)
def last_file_pos(conlogdb):
sql = ("SELECT log_file, log_pos FROM clickhouse_changelog "
"ORDER BY log_file DESC, log_pos DESC LIMIT 1")
with conlogdb.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchone()
def master_status(conlogdb):
sql = "SHOW MASTER STATUS"
with conlogdb.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchone()
def insert_log_db(conlogdb, values):
with conlogdb.cursor() as cursor:
# Create a new record
sql = (
"REPLACE INTO `clickhouse_changelog` "
"(`db`, `tbl`, `created_at`, `log_file`, `log_pos`) "
"VALUES (%s, %s, DATE_ADD(%s, INTERVAL - WEEKDAY(%s) DAY), %s, %s)")
cursor.execute(sql, values)
# connection is not autocommit by default. So you must commit to save
# your changes.
conlogdb.commit()
def main():
values = None
conlogdb = connect_log_db(LOG_DB_HOST)
consrcdb = connect_log_db(SRC_DB_HOST)
file_pos = last_file_pos(conlogdb)
if file_pos is not None:
log_file = file_pos['log_file']
log_pos = file_pos['log_pos']
else:
file_pos = master_status(consrcdb)
log_file = file_pos['File']
log_pos = file_pos['Position']
print "Starting streaming at file: %s, position: %s" % (log_file, log_pos)
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS, resume_stream=True,
server_id=172313514, log_file=log_file, log_pos=log_pos,
only_events=[DeleteRowsEvent, UpdateRowsEvent], blocking=True)
# If previous week/table processed is the same, we avoid the INSERT as
# its redundant and affects performance
pweek = None
ptable = None
for binlogevent in stream:
for row in binlogevent.rows:
if binlogevent.table != TABLE: continue
if isinstance(binlogevent, DeleteRowsEvent):
values = row["values"]
elif isinstance(binlogevent, UpdateRowsEvent):
values = row["after_values"]
else:
continue
if ptable == binlogevent.table and pweek == values['created_at'].strftime('%Y-%m-%d'):
continue
ptable = binlogevent.table
pweek = values['created_at'].strftime('%Y-%m-%d')
# action keys '0 unk, 1 ins, 2 upd, 3 del'
event = (binlogevent.schema, binlogevent.table,
values['created_at'].strftime('%Y-%m-%d'),
values['created_at'].strftime('%Y-%m-%d'),
stream.log_file, int(stream.log_pos))
insert_log_db(conlogdb, event)
sys.stdout.flush()
stream.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment