Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created May 31, 2022 13:14
Show Gist options
  • Save miohtama/3bda3231bc95df4ba1c6541245196ca0 to your computer and use it in GitHub Desktop.
Save miohtama/3bda3231bc95df4ba1c6541245196ca0 to your computer and use it in GitHub Desktop.
Faster bulk_update_mappings with PostgreSQL + SQLAlchemy using temporary tables and UPDATE ... FROM
def bulk_load_psql_using_temp_table(
dbsession: Session,
data_as_dicts: List[dict],
):
"""Bulk update columns in PostgreSQL faster using temp table.
Works around speed issues on `bulk_update_mapping()` and PostgreSQL.
Your mileage and speed may vary, but it is going to be faster.
The observation was 3x ... 4x faster when doing UPDATEs
where one of the columns is indexed.
Contains hardcoded temp table creation and UPDATE FROM statements.
In our case we are bulk updating three columns.
- Create a temp table - if not created before
- Filling it from the in-memory CSV using COPY FROM
- Then performing UPDATE ... FROM on the actual table from the temp table
- Between the update chunks, clear the temp table using TRUNCATE
Why is it faster? I have did not get a clear answer from the sources I wa reading.
At least there should be
less data uploaded from the client to the server,
as CSV loading is more compact than bulk updates.
Further reading
- `About PSQL temp tables <https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-temporary-table/>`_
- `Naive bulk_update_mapping approach <https://stackoverflow.com/questions/36272316/using-bulk-update-mappings-in-sqlalchemy-to-update-multiple-rows-with-different>`_
- `Discussion on UPDATE ... FROM + temp table approach <https://stackoverflow.com/questions/3361291/slow-simple-update-query-on-postgresql-database-with-3-million-rows/24811058#24811058>_`.
:dbsession:
SQLAlchemy session.
Note that we open a separate connection for the bulk update.
:param data_as_dicts:
In bound data as it would be given to bulk_update_mapping
"""
# mem table created in sql
temp_table_name = "temp_bulk_temp_loader"
# the real table of which data we are filling
real_table_name = "swap"
# colums we need to copy
columns = ["id", "sync_event_id", "sync_reserve0", "sync_reserve1"]
# how our CSV fields are separated
delim = ";"
# Increase temp buffer size for updates
temp_buffer_size = "3000MB"
# Dump data to a local mem buffer using CSV writer.
# No header - this is specifically addressed in copy_from()
out = StringIO()
writer = csv.DictWriter(out, fieldnames=columns, delimiter=delim)
writer.writerows(data_as_dicts)
# Update data in alternative raw connection
engine = dbsession.bind
conn = engine.connect()
try:
# No rollbacks
conn.execution_options(isolation_level="AUTOCOMMIT")
# See https://blog.codacy.com/how-to-update-large-tables-in-postgresql/
conn.execute(f"""SET temp_buffers = "{temp_buffer_size}";""")
# Temp table is dropped at the end of the session
# https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-temporary-table/
# This must match data_as_dicts structure.
sql = f"""
CREATE TEMP TABLE IF NOT EXISTS {temp_table_name}
(
id int,
sync_event_id int,
sync_reserve0 bytea,
sync_reserve1 bytea
);
"""
conn.execute(sql)
# Clean any pending data in the temp table
# between update chunks.
# TODO: Not sure why this does not clear itself at conn.close()
# as I would expect based on the documentation.
sql = f"TRUNCATE {temp_table_name}"
conn.execute(sql)
# Load data from CSV to the temp table
# https://www.psycopg.org/docs/cursor.html
cursor = conn.connection.cursor()
out.seek(0)
cursor.copy_from(out, temp_table_name, sep=delim, columns=columns)
# Fill real table from the temp table
# This copies values from the temp table using
# UPDATE...FROM and matching by the row id.
sql = f"""
UPDATE {real_table_name}
SET
sync_event_id=b.sync_event_id,
sync_reserve0=b.sync_reserve0,
sync_reserve1=b.sync_reserve1
FROM {temp_table_name} AS b
WHERE {real_table_name}.id=b.id;
"""
res = conn.execute(sql)
logger.debug("Updated %d rows", res.rowcount)
finally:
conn.close()
def match_swap_and_sync_fast(dbsession: Session, chain_id: int, start_block: int, end_block: int) -> Counter:
"""Match events using CSV + COPY FROM + UPDATE FROM.
Assume Sync(tx_hash, log_index - 1) == Swap(tx_hash, log_index)
- Reads swaps and syncs from the database
- Updates swaps in the database
- This function can be called over and over, as result will be always the same.
- Does **not** avoid updating Swaps that already have `sync_event_id` set
:param dbsession:
Databaess connection
:param chain_id:
Blockchain we are intereted in
:param start_block:
Chunk we are going to update.
When to start to match (inclusive).
:param end_block:
Chunk we are going to update.
When to end match (inclusive).
"""
assert type(chain_id) == int
assert type(start_block) == int
assert type(end_block) == int
stats = Counter(matched=0, unmatched=0, swaps=0, syncs=0)
# Old JOIN based method
# blocks = dbsession \
# .query(Block.id) \
# .filter_by(chain_id=chain_id) \
# .filter(Block.block_number >= start_block) \
# .filter(Block.block_number <= end_block)
swaps = dbsession\
.query(Swap.id, Swap.tx_hash, Swap.log_index) \
.filter(Swap.denorm_chain_id == chain_id) \
.filter(Swap.denorm_block_number >= start_block) \
.filter(Swap.denorm_block_number <= end_block)
# Sync events set reserve0 and reserve1 on liquidity events
# If reserve0/reserve1 is not set, then it is LP mint/burn event
syncs = dbsession\
.query(LiquidityEvent.id, LiquidityEvent.tx_hash, LiquidityEvent.log_index,
LiquidityEvent.reserve0, LiquidityEvent.reserve1) \
.filter(LiquidityEvent.denorm_chain_id == chain_id) \
.filter(LiquidityEvent.denorm_block_number >= start_block) \
.filter(LiquidityEvent.denorm_block_number <= end_block)
# (tx_hash, log_index) -> sync data map
logger.debug("Reading sync data %d - %d", start_block, end_block)
sync_map: Dict[tuple, dict] = {}
for s in syncs:
key = (s["tx_hash"], s["log_index"])
sync_map[key] = s
stats["syncs"] += 1
# Construct bulk update data
bulk_update_buffer: List[dict] = []
logger.debug("Filling in swap data")
for s in swaps:
# Sync event should always have LOG_INDEX one previous to Swap
# on a proper Uni v2 exchanges.
# Then we have some exchanges that look like Uni v2, but are different
# or scams.
key = (s["tx_hash"], s["log_index"] - 1)
sync = sync_map.get(key)
stats["swaps"] += 1
if sync:
# Somehow mint/burn event instead of sync appeared before the swap
assert sync["reserve0"] > 0 or sync["reserve1"] > 0, f"Bad liquidity sync data: {sync}"
bulk_update_buffer.append({
"id": s["id"],
"sync_event_id": sync["id"],
"sync_reserve0": sync["reserve0"],
"sync_reserve1": sync["reserve0"],
})
stats["matched"] += 1
else:
# Event produced by some funky incompatible swap?
# https://etherscan.io/tx/0xb67ee4e885d38b5c056a98a03d05b4f7c4fbca98700b9c8397bc3560033d4d20#eventlog
bulk_update_buffer.append({
"id": s["id"],
"sync_event_id": -1,
"sync_reserve0": 0,
"sync_reserve1": 0,
})
stats["unmatched"] += 1
logger.debug("Matching swaps and syncs, chain %d, %d - %d, we have %d swaps and %d syncs, %d matches", chain_id, start_block, end_block, stats["swaps"], stats["syncs"], stats["matched"])
# Was: bulk_update_mappings
bulk_load_psql_using_temp_table(dbsession, bulk_update_buffer)
return stats
def match_swap_and_sync_chunked(dbsession: Session, chain_id: int, start_block: int, end_block: int, chunk_size=DEFAULT_CHUNK_SIZE) -> Counter:
"""Matchs swaps and syncs, but splits commits over smaller row counts.
Avoid doing too large commits at once e.g. by running the query over all 26M Polygon blocks once.
:return:
Diagnostics stats
"""
logger.info("Matching swaps and syncs, chunking at %d, chain %d, %d - %d", chunk_size, chain_id, start_block, end_block)
stats = Counter()
total = end_block - start_block
with tqdm(total=total) as progress_bar:
current = start_block
while current <= end_block:
batch_last = min(end_block, current + chunk_size - 1)
stats += match_swap_and_sync_fast(dbsession, chain_id, current, batch_last)
dbsession.commit()
current += chunk_size
progress_bar.set_description(f"Match swap and sync, chain {chain_id}, update range: {start_block:,} - {end_block:,}, now at {current:,}, matched {stats['matched']:,} events")
progress_bar.update(chunk_size)
logger.info("Finished with %s", stats)
return stats
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment