Skip to content

Instantly share code, notes, and snippets.

@sirex
Last active May 13, 2016 14:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sirex/5e1ceac90bf473f42f3904889992b8fd to your computer and use it in GitHub Desktop.
Save sirex/5e1ceac90bf473f42f3904889992b8fd to your computer and use it in GitHub Desktop.
Data synchronisation benchmarks

Three bulk data update methods where used:

  1. Check if row exist then update or insert

    This method executes select query for each row to check if row exist or not. If row exist then an update statement will be executed if row does not exist then this row will be buffered and bulk inserted later.

    • This method does not delete rows if they does not exist in provided new set of rows.
  2. Always insert and clean up afterwards

    This method always does bulk inserts and after inserting all new rows executed a delete query that deletes old rows that where update leaving those that was not updated.

    • This method does not delete rows if they does not exist in provided new set of rows.
    • This method requires extra field for tracking which rows are outdated.
    • This method drops original primary key in order to enable bulk inserts on top of existing data.
  3. First delete everything then insert everything

    This method simply deletes all rows in a table and then does bulk insert of new rows.

    • This method always overwrites existing data without possibility to update exiting data with partial new data.

Benchmark results showed that in all cases third method is the fastest method.

Here is a short summary of results:

  • test_append_all - all provided rows are new and does not have matching primary keys in existing data.

    method time in ms n times slower
    3

    186

    1.00

    2

    363

    1.95

    1

    1,192

    6.41

    In this situation method 3 does not really work, because it always overwrites all data, but in this case, existing data should be updated.

  • test_update_all - in this case all existing data have changed it means that all existing data in table have to be updated leaving same primary keys.

    method time in ms n times slower
    3

    163

    1.00

    2

    435

    2.67

    1

    1,824

    11.18

    In this case third method fits perfectly well and it is the fasted method.

    Third method in this case can be used when you need to update all the data.

  • test_update_some - in this case just 5 existing rows has been changed and all the rest are new.

    method time in ms n times slower
    3

    163

    1.00

    2

    372

    2.28

    1

    1,985

    11.66

---------------------------------------------------------------------------------- benchmark 'test_append_all': 3 tests ---------------------------------------------------------------------------------
Name (time in ms) Min Max Mean StdDev Median IQR Outliers(*) Rounds Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_append_all[3-1000-1000-postgresql] 136.4127 (1.0) 276.2953 (1.0) 186.2097 (1.0) 53.6148 (4.98) 169.4176 (1.0) 54.6714 (3.19) 1;0 5 1
test_append_all[2-1000-1000-postgresql] 348.5794 (2.56) 374.0098 (1.35) 363.4665 (1.95) 10.7761 (1.0) 365.5250 (2.16) 18.2096 (1.06) 1;0 5 1
test_append_all[1-1000-1000-postgresql] 1,177.2909 (8.63) 1,220.2954 (4.42) 1,192.7304 (6.41) 16.5069 (1.53) 1,186.5798 (7.00) 17.1166 (1.0) 1;0 5 1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------- benchmark 'test_update_all': 3 tests ----------------------------------------------------------------------------------
Name (time in ms) Min Max Mean StdDev Median IQR Outliers(*) Rounds Iterations
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_update_all[3-1000-1000-postgresql] 129.0315 (1.0) 185.8968 (1.0) 163.2807 (1.0) 21.8241 (1.0) 162.4755 (1.0) 26.8849 (1.0) 2;0 5 1
test_update_all[2-1000-1000-postgresql] 357.5417 (2.77) 690.8522 (3.72) 435.8581 (2.67) 142.8428 (6.55) 373.9472 (2.30) 90.2096 (3.36) 1;1 5 1
test_update_all[1-1000-1000-postgresql] 1,786.7431 (13.85) 1,877.2771 (10.10) 1,824.6985 (11.18) 34.6376 (1.59) 1,810.4977 (11.14) 43.6386 (1.62) 2;0 5 1
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------- benchmark 'test_update_some': 3 tests -----------------------------------------------------------------------------------
Name (time in ms) Min Max Mean StdDev Median IQR Outliers(*) Rounds Iterations
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_update_some[3-5-1000-1000-postgresql] 152.8430 (1.0) 186.0955 (1.0) 163.2764 (1.0) 13.4380 (1.0) 161.0548 (1.0) 14.6920 (1.0) 1;0 5 1
test_update_some[2-5-1000-1000-postgresql] 340.5698 (2.23) 399.0909 (2.14) 372.3943 (2.28) 20.8231 (1.55) 374.0895 (2.32) 15.0242 (1.02) 2;1 5 1
test_update_some[1-5-1000-1000-postgresql] 1,784.9964 (11.68) 1,985.3390 (10.67) 1,904.0329 (11.66) 73.4016 (5.46) 1,918.7887 (11.91) 68.1215 (4.64) 2;0 5 1
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------- benchmark 'test_number_of_rows': 3 tests ----------------------------------------------------------------------------------
Name (time in s) Min Max Mean StdDev Median IQR Outliers(*) Rounds Iterations
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_number_of_rows[1-10000-1000-postgresql] 11.8103 (1.0) 11.8103 (1.0) 11.8103 (1.0) 0.0000 (1.0) 11.8103 (1.0) 0.0000 (1.0) 0;0 1 1
test_number_of_rows[1-100000-1000-postgresql] 114.8244 (9.72) 114.8244 (9.72) 114.8244 (9.72) 0.0000 (1.0) 114.8244 (9.72) 0.0000 (1.0) 0;0 1 1
test_number_of_rows[1-1000000-1000-postgresql] 1,173.2897 (99.34) 1,173.2897 (99.34) 1,173.2897 (99.34) 0.0000 (1.0) 1,173.2897 (99.34) 0.0000 (1.0) 0;0 1 1
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
(*) Outliers: 1 Standard Deviation from Mean; 1.5 IQR (InterQuartile Range) from 1st Quartile and 3rd Quartile.
"""
$ py.test --benchmark-group-by=func test_sync.py
"""
import uuid
import pytest
import sqlalchemy as sa
from alembic.migration import MigrationContext
from alembic.operations import Operations
from jsontableschema.model import SchemaModel
from jsontableschema_sql import Storage
ROWS = 1000
BUFFER_ROWS = 1000
dsn_strings = {
'in-memory': 'sqlite://',
'sqlite': 'sqlite:///data.db',
'postgresql': 'postgresql:///test',
}
class _Storage(object):
schema = {
'primaryKey': 'id',
'fields': [
{'name': 'id', 'type': 'integer'},
{'name': 'comment', 'type': 'string'},
]
}
def __init__(self, engine):
# Drop data table if exists
sa.Table('data', sa.MetaData()).drop(engine, checkfirst=True)
self.engine = engine
self.storage = Storage(engine=engine)
self.storage.create('data', self.schema)
self.model = SchemaModel(self.schema)
self.migrate()
self.metadata = sa.MetaData(engine)
self.dbtable = sa.Table('data', self.metadata, autoload=True)
self.conn = engine.connect()
def migrate(self):
pass
class _StorageSleep(_Storage):
def migrate(self):
op = Operations(MigrationContext.configure(self.engine))
# Change original primary key to index
op.drop_constraint('data_pkey', 'data', 'primary')
op.create_index('id_idx', 'data', ['id'])
# Create update sequence id and make it a primary key
op.add_column('data', sa.Column('seq', sa.Integer, primary_key=True))
# op.create_primary_key('seq_pkey', 'data', ['seq'])
storages = {
'1': _Storage,
'2': _StorageSleep,
'3': _Storage,
}
def _get_random_data(n_rows, index=1):
for i in range(index, n_rows + index):
yield (i, str(uuid.uuid4()))
def _sync_1(storage, data, buffer_rows):
"""Check if row then update or insert (no table alteration required)"""
conn = storage.conn
table = storage.dbtable
with conn.begin():
rows = []
for row in data:
row_dict = {}
for index, field in enumerate(storage.model.fields):
value = row[index]
value = storage.model.cast(field['name'], value)
row_dict[field['name']] = value
# Check if row already exists
pkey = row_dict['id']
query = sa.select([
sa.exists().
where(table.c.id == pkey)
])
exists = conn.execute(query).scalar()
if exists:
del row_dict['id']
conn.execute(
table.update().values(row_dict).
where(table.c.id == pkey)
)
else:
rows.append(row_dict)
if len(rows) > buffer_rows:
conn.execute(table.insert(), rows)
rows = []
if len(rows) > 0:
conn.execute(table.insert(), rows)
def _deduplicate(engine, conn, table):
agg = (
sa.select([table.c.id, sa.func.max(table.c.seq).label('seq')]).
group_by(table.c.id).
having(sa.func.count(table.c.seq) > 1).
alias()
)
query = (
sa.select([table.c.seq]).
select_from(table.join(agg, sa.and_(
table.c.id == agg.c.id,
table.c.seq != agg.c.seq,
)))
)
if engine.name == 'mysql':
# http://stackoverflow.com/a/45498/475477
query = sa.select([query.alias().c.seq])
conn.execute(table.delete(table.c.seq.in_(query)))
def _sync_2(storage, data, buffer_rows):
"""Always insert and clean up afterwards (requeres seq field)"""
conn = storage.conn
table = storage.dbtable
with conn.begin():
rows = []
for row in data:
row_dict = {}
for index, field in enumerate(storage.model.fields):
value = row[index]
value = storage.model.cast(field['name'], value)
row_dict[field['name']] = value
rows.append(row_dict)
if len(rows) > buffer_rows:
table.insert().execute(rows)
rows = []
if len(rows) > 0:
conn.execute(table.insert(), rows)
_deduplicate(storage.engine, conn, table)
def _sync_3(storage, data, buffer_rows):
"""First delete everything then insert (no table alteration required)"""
conn = storage.conn
table = storage.dbtable
with conn.begin():
# Delete all rows
conn.execute(table.delete())
# Insert all rows
rows = []
for row in data:
row_dict = {}
for index, field in enumerate(storage.model.fields):
value = row[index]
value = storage.model.cast(field['name'], value)
row_dict[field['name']] = value
rows.append(row_dict)
if len(rows) > buffer_rows:
conn.execute(table.insert(), rows)
rows = []
if len(rows) > 0:
conn.execute(table.insert(), rows)
@pytest.mark.parametrize('rows,buffer_rows,dsn', [
(ROWS, BUFFER_ROWS, 'postgresql'),
])
@pytest.mark.parametrize('method', ['1', '2', '3'])
def test_update_all(request, benchmark, method, rows, buffer_rows, dsn):
request.keywords.node.name = 'foobar'
engine = sa.create_engine(dsn_strings[dsn])
storage = storages[method](engine)
initial_data = list(_get_random_data(rows))
updated_data = list(_get_random_data(rows))
# Fill table with data
def setup():
storage.dbtable.delete().execute()
storage.storage.write('data', initial_data)
# Update all rows with modified data
fn = globals()['_sync_' + method]
args = (storage, updated_data, buffer_rows)
benchmark.pedantic(fn, args, setup=setup, rounds=5)
assert storage.dbtable.count().execute().scalar() == rows
table = storage.dbtable
query = engine.execute(
sa.select([table.c.id, table.c.comment]).order_by(table.c.id)
)
assert list(query) == updated_data
@pytest.mark.parametrize('changed_rows,rows,buffer_rows,dsn', [
(5, ROWS, BUFFER_ROWS, 'postgresql'),
])
@pytest.mark.parametrize('method', ['1', '2', '3'])
def test_update_some(request, benchmark, method, changed_rows, rows, buffer_rows, dsn):
request.keywords.node.name = 'foobar'
engine = sa.create_engine(dsn_strings[dsn])
storage = storages[method](engine)
initial_data = list(_get_random_data(rows))
updated_data = list(_get_random_data(changed_rows)) + initial_data[changed_rows:]
# Fill table with data
def setup():
storage.dbtable.delete().execute()
storage.storage.write('data', initial_data)
# Update all rows with modified data
fn = globals()['_sync_' + method]
args = (storage, updated_data, buffer_rows)
benchmark.pedantic(fn, args, setup=setup, rounds=5)
assert storage.dbtable.count().execute().scalar() == rows
table = storage.dbtable
query = engine.execute(
sa.select([table.c.id, table.c.comment]).order_by(table.c.id)
)
assert list(query) == updated_data
@pytest.mark.parametrize('rows,buffer_rows,dsn', [
(ROWS, BUFFER_ROWS, 'postgresql'),
])
@pytest.mark.parametrize('method', ['1', '2', '3'])
def test_append_all(request, benchmark, method, rows, buffer_rows, dsn):
request.keywords.node.name = 'foobar'
engine = sa.create_engine(dsn_strings[dsn])
storage = storages[method](engine)
initial_data = list(_get_random_data(rows))
updated_data = list(_get_random_data(rows, index=rows + 1))
# Fill table with data
def setup():
storage.dbtable.delete().execute()
storage.storage.write('data', initial_data)
# Update all rows with modified data
fn = globals()['_sync_' + method]
args = (storage, updated_data, buffer_rows)
benchmark.pedantic(fn, args, setup=setup, rounds=5)
if method == '3':
assert storage.dbtable.count().execute().scalar() == rows
else:
assert storage.dbtable.count().execute().scalar() == rows * 2
table = storage.dbtable
query = engine.execute(
sa.select([table.c.id, table.c.comment]).order_by(table.c.id)
)
if method == '3':
assert list(query) == updated_data
else:
assert list(query) == initial_data + updated_data
@pytest.mark.parametrize('rows,buffer_rows,dsn', [
(10000, 1000, 'postgresql'),
(100000, 1000, 'postgresql'),
(1000000, 1000, 'postgresql'),
])
@pytest.mark.parametrize('method', ['1'])
def test_number_of_rows(request, benchmark, method, rows, buffer_rows, dsn):
request.keywords.node.name = 'foobar'
engine = sa.create_engine(dsn_strings[dsn])
storage = storages[method](engine)
# Fill table with data
def setup():
storage.dbtable.delete().execute()
# Update all rows with modified data
fn = globals()['_sync_' + method]
data = list(_get_random_data(rows))
args = (storage, data, buffer_rows)
benchmark.pedantic(fn, args, setup=setup, rounds=1)
if __name__ == '__main__':
import IPython
engine = sa.create_engine('postgresql:///test')
conn = engine.connect()
meta = sa.MetaData(conn)
table = sa.Table('data', meta, autoload=True)
IPython.embed()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment