Skip to content

Instantly share code, notes, and snippets.

@smarek
Last active May 28, 2019 05:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smarek/d88a870128cdda948be70eb3f93d4c92 to your computer and use it in GitHub Desktop.
Save smarek/d88a870128cdda948be70eb3f93d4c92 to your computer and use it in GitHub Desktop.
MariaDB Tarantool Replication script - now available as full project systemd-integrated, https://github.com/smarek/mariadb-tarantool-replication
#!/usr/bin/env python3
# Install python-mysql-replication patched
# git clone --recursive https://github.com/smarek/python-mariadb-replication.git
# cd python-mariadb-replication
# python3 setup.py install
# Install tarantool and tarantool-python
# If you use debian, use the official repository, see https://www.tarantool.io/en/download/os-installation/1.10/debian/
# Configure `tablesMap`, `keysMap` and `mySchema` to fit your needs
# run
import tarantool
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}
stream = BinLogStreamReader(connection_settings = mysql_settings, server_id=100, blocking=True, only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
tntcon = tarantool.connect("localhost", 3301, 'root', '')
tntUsersFieldOrder = ['ID', 'Name', 'Email']
tntUsersKeys = tntUsersFieldOrder[0:1]+tntUsersFieldOrder[2:3]
tablesMap = { 'Users': [{'UsersCache': tntUsersFieldOrder}] }
keysMap = {'UsersCache': tntUsersKeys}
mySchema = "MyDatabaseName"
def tnt_get_space(space_name):
global tntcon
return tntcon.space(space_name)
def get_targets(source_table):
global tablesMap
if source_table in tablesMap:
return tablesMap[source_table]
else:
return list()
def get_keys(target_space):
global keysMap
if target_space in keysMap:
return keysMap[target_space]
else:
return list()
def tnt_delete(targets, row, BLEvent):
row = dict(row["values"])
for target in targets:
for tntSpace,tntFields in target.items():
tnt_get_space(tntSpace).delete(tuple([row[key] for key in get_keys(tntSpace)]))
def tnt_insert(targets, row, BLEvent):
row = dict(row["values"])
for target in targets:
for tntSpace,tntFields in target.items():
tnt_get_space(tntSpace).replace(tuple([row[key] for key in tntFields]))
def tnt_update(targets, row, BLEvent):
row["values"] = row["before_values"]
tnt_delete(targets, row, BLEvent)
row = dict(row["after_values"])
for target in targets:
for tntSpace,tntFields in target.items():
tnt_get_space(tntSpace).replace(tuple([row[key] for key in tntFields]))
def is_empty(what):
if what:
return False
else:
return True
try:
for binlogevent in stream:
if binlogevent.schema != mySchema:
continue
for row in binlogevent.rows:
targets = get_targets(binlogevent.table)
if is_empty(targets):
continue
if isinstance(binlogevent, DeleteRowsEvent):
tnt_delete(targets, row, binlogevent)
elif isinstance(binlogevent, UpdateRowsEvent):
tnt_update(targets, row, binlogevent)
elif isinstance(binlogevent, WriteRowsEvent):
tnt_insert(targets, row, binlogevent)
finally:
stream.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment