|
""" |
|
|
|
$ py.test --benchmark-group-by=func test_sync.py |
|
|
|
""" |
|
|
|
import uuid |
|
import pytest |
|
import sqlalchemy as sa |
|
|
|
from alembic.migration import MigrationContext |
|
from alembic.operations import Operations |
|
|
|
from jsontableschema.model import SchemaModel |
|
from jsontableschema_sql import Storage |
|
|
|
|
|
ROWS = 1000 |
|
BUFFER_ROWS = 1000 |
|
|
|
|
|
dsn_strings = { |
|
'in-memory': 'sqlite://', |
|
'sqlite': 'sqlite:///data.db', |
|
'postgresql': 'postgresql:///test', |
|
} |
|
|
|
|
|
class _Storage(object): |
|
|
|
schema = { |
|
'primaryKey': 'id', |
|
'fields': [ |
|
{'name': 'id', 'type': 'integer'}, |
|
{'name': 'comment', 'type': 'string'}, |
|
] |
|
} |
|
|
|
def __init__(self, engine): |
|
# Drop data table if exists |
|
sa.Table('data', sa.MetaData()).drop(engine, checkfirst=True) |
|
|
|
self.engine = engine |
|
self.storage = Storage(engine=engine) |
|
self.storage.create('data', self.schema) |
|
self.model = SchemaModel(self.schema) |
|
|
|
self.migrate() |
|
|
|
self.metadata = sa.MetaData(engine) |
|
self.dbtable = sa.Table('data', self.metadata, autoload=True) |
|
self.conn = engine.connect() |
|
|
|
def migrate(self): |
|
pass |
|
|
|
|
|
class _StorageSleep(_Storage): |
|
|
|
def migrate(self): |
|
op = Operations(MigrationContext.configure(self.engine)) |
|
|
|
# Change original primary key to index |
|
op.drop_constraint('data_pkey', 'data', 'primary') |
|
op.create_index('id_idx', 'data', ['id']) |
|
|
|
# Create update sequence id and make it a primary key |
|
op.add_column('data', sa.Column('seq', sa.Integer, primary_key=True)) |
|
# op.create_primary_key('seq_pkey', 'data', ['seq']) |
|
|
|
|
|
storages = { |
|
'1': _Storage, |
|
'2': _StorageSleep, |
|
'3': _Storage, |
|
} |
|
|
|
|
|
def _get_random_data(n_rows, index=1): |
|
for i in range(index, n_rows + index): |
|
yield (i, str(uuid.uuid4())) |
|
|
|
|
|
def _sync_1(storage, data, buffer_rows): |
|
"""Check if row then update or insert (no table alteration required)""" |
|
conn = storage.conn |
|
table = storage.dbtable |
|
with conn.begin(): |
|
rows = [] |
|
for row in data: |
|
row_dict = {} |
|
for index, field in enumerate(storage.model.fields): |
|
value = row[index] |
|
value = storage.model.cast(field['name'], value) |
|
row_dict[field['name']] = value |
|
|
|
# Check if row already exists |
|
pkey = row_dict['id'] |
|
query = sa.select([ |
|
sa.exists(). |
|
where(table.c.id == pkey) |
|
]) |
|
exists = conn.execute(query).scalar() |
|
|
|
if exists: |
|
del row_dict['id'] |
|
conn.execute( |
|
table.update().values(row_dict). |
|
where(table.c.id == pkey) |
|
) |
|
else: |
|
rows.append(row_dict) |
|
|
|
if len(rows) > buffer_rows: |
|
conn.execute(table.insert(), rows) |
|
rows = [] |
|
|
|
if len(rows) > 0: |
|
conn.execute(table.insert(), rows) |
|
|
|
|
|
def _deduplicate(engine, conn, table): |
|
agg = ( |
|
sa.select([table.c.id, sa.func.max(table.c.seq).label('seq')]). |
|
group_by(table.c.id). |
|
having(sa.func.count(table.c.seq) > 1). |
|
alias() |
|
) |
|
|
|
query = ( |
|
sa.select([table.c.seq]). |
|
select_from(table.join(agg, sa.and_( |
|
table.c.id == agg.c.id, |
|
table.c.seq != agg.c.seq, |
|
))) |
|
) |
|
|
|
if engine.name == 'mysql': |
|
# http://stackoverflow.com/a/45498/475477 |
|
query = sa.select([query.alias().c.seq]) |
|
|
|
conn.execute(table.delete(table.c.seq.in_(query))) |
|
|
|
|
|
def _sync_2(storage, data, buffer_rows): |
|
"""Always insert and clean up afterwards (requeres seq field)""" |
|
conn = storage.conn |
|
table = storage.dbtable |
|
with conn.begin(): |
|
rows = [] |
|
for row in data: |
|
row_dict = {} |
|
for index, field in enumerate(storage.model.fields): |
|
value = row[index] |
|
value = storage.model.cast(field['name'], value) |
|
row_dict[field['name']] = value |
|
rows.append(row_dict) |
|
|
|
if len(rows) > buffer_rows: |
|
table.insert().execute(rows) |
|
rows = [] |
|
|
|
if len(rows) > 0: |
|
conn.execute(table.insert(), rows) |
|
|
|
_deduplicate(storage.engine, conn, table) |
|
|
|
|
|
def _sync_3(storage, data, buffer_rows): |
|
"""First delete everything then insert (no table alteration required)""" |
|
conn = storage.conn |
|
table = storage.dbtable |
|
with conn.begin(): |
|
# Delete all rows |
|
conn.execute(table.delete()) |
|
|
|
# Insert all rows |
|
rows = [] |
|
for row in data: |
|
row_dict = {} |
|
for index, field in enumerate(storage.model.fields): |
|
value = row[index] |
|
value = storage.model.cast(field['name'], value) |
|
row_dict[field['name']] = value |
|
rows.append(row_dict) |
|
|
|
if len(rows) > buffer_rows: |
|
conn.execute(table.insert(), rows) |
|
rows = [] |
|
|
|
if len(rows) > 0: |
|
conn.execute(table.insert(), rows) |
|
|
|
|
|
@pytest.mark.parametrize('rows,buffer_rows,dsn', [ |
|
(ROWS, BUFFER_ROWS, 'postgresql'), |
|
]) |
|
@pytest.mark.parametrize('method', ['1', '2', '3']) |
|
def test_update_all(request, benchmark, method, rows, buffer_rows, dsn): |
|
request.keywords.node.name = 'foobar' |
|
|
|
engine = sa.create_engine(dsn_strings[dsn]) |
|
storage = storages[method](engine) |
|
|
|
initial_data = list(_get_random_data(rows)) |
|
updated_data = list(_get_random_data(rows)) |
|
|
|
# Fill table with data |
|
def setup(): |
|
storage.dbtable.delete().execute() |
|
storage.storage.write('data', initial_data) |
|
|
|
# Update all rows with modified data |
|
fn = globals()['_sync_' + method] |
|
args = (storage, updated_data, buffer_rows) |
|
benchmark.pedantic(fn, args, setup=setup, rounds=5) |
|
|
|
assert storage.dbtable.count().execute().scalar() == rows |
|
|
|
table = storage.dbtable |
|
query = engine.execute( |
|
sa.select([table.c.id, table.c.comment]).order_by(table.c.id) |
|
) |
|
assert list(query) == updated_data |
|
|
|
|
|
@pytest.mark.parametrize('changed_rows,rows,buffer_rows,dsn', [ |
|
(5, ROWS, BUFFER_ROWS, 'postgresql'), |
|
]) |
|
@pytest.mark.parametrize('method', ['1', '2', '3']) |
|
def test_update_some(request, benchmark, method, changed_rows, rows, buffer_rows, dsn): |
|
request.keywords.node.name = 'foobar' |
|
|
|
engine = sa.create_engine(dsn_strings[dsn]) |
|
storage = storages[method](engine) |
|
|
|
initial_data = list(_get_random_data(rows)) |
|
updated_data = list(_get_random_data(changed_rows)) + initial_data[changed_rows:] |
|
|
|
# Fill table with data |
|
def setup(): |
|
storage.dbtable.delete().execute() |
|
storage.storage.write('data', initial_data) |
|
|
|
# Update all rows with modified data |
|
fn = globals()['_sync_' + method] |
|
args = (storage, updated_data, buffer_rows) |
|
benchmark.pedantic(fn, args, setup=setup, rounds=5) |
|
|
|
assert storage.dbtable.count().execute().scalar() == rows |
|
|
|
table = storage.dbtable |
|
query = engine.execute( |
|
sa.select([table.c.id, table.c.comment]).order_by(table.c.id) |
|
) |
|
assert list(query) == updated_data |
|
|
|
|
|
@pytest.mark.parametrize('rows,buffer_rows,dsn', [ |
|
(ROWS, BUFFER_ROWS, 'postgresql'), |
|
]) |
|
@pytest.mark.parametrize('method', ['1', '2', '3']) |
|
def test_append_all(request, benchmark, method, rows, buffer_rows, dsn): |
|
request.keywords.node.name = 'foobar' |
|
|
|
engine = sa.create_engine(dsn_strings[dsn]) |
|
storage = storages[method](engine) |
|
|
|
initial_data = list(_get_random_data(rows)) |
|
updated_data = list(_get_random_data(rows, index=rows + 1)) |
|
|
|
# Fill table with data |
|
def setup(): |
|
storage.dbtable.delete().execute() |
|
storage.storage.write('data', initial_data) |
|
|
|
# Update all rows with modified data |
|
fn = globals()['_sync_' + method] |
|
args = (storage, updated_data, buffer_rows) |
|
benchmark.pedantic(fn, args, setup=setup, rounds=5) |
|
|
|
if method == '3': |
|
assert storage.dbtable.count().execute().scalar() == rows |
|
else: |
|
assert storage.dbtable.count().execute().scalar() == rows * 2 |
|
|
|
table = storage.dbtable |
|
query = engine.execute( |
|
sa.select([table.c.id, table.c.comment]).order_by(table.c.id) |
|
) |
|
if method == '3': |
|
assert list(query) == updated_data |
|
else: |
|
assert list(query) == initial_data + updated_data |
|
|
|
|
|
@pytest.mark.parametrize('rows,buffer_rows,dsn', [ |
|
(10000, 1000, 'postgresql'), |
|
(100000, 1000, 'postgresql'), |
|
(1000000, 1000, 'postgresql'), |
|
]) |
|
@pytest.mark.parametrize('method', ['1']) |
|
def test_number_of_rows(request, benchmark, method, rows, buffer_rows, dsn): |
|
request.keywords.node.name = 'foobar' |
|
|
|
engine = sa.create_engine(dsn_strings[dsn]) |
|
storage = storages[method](engine) |
|
|
|
# Fill table with data |
|
def setup(): |
|
storage.dbtable.delete().execute() |
|
|
|
# Update all rows with modified data |
|
fn = globals()['_sync_' + method] |
|
data = list(_get_random_data(rows)) |
|
args = (storage, data, buffer_rows) |
|
benchmark.pedantic(fn, args, setup=setup, rounds=1) |
|
|
|
|
|
if __name__ == '__main__': |
|
import IPython |
|
|
|
engine = sa.create_engine('postgresql:///test') |
|
conn = engine.connect() |
|
meta = sa.MetaData(conn) |
|
table = sa.Table('data', meta, autoload=True) |
|
|
|
IPython.embed() |