Skip to content

Instantly share code, notes, and snippets.

@addam
Last active November 7, 2017 20:13
Show Gist options
  • Save addam/d1aa628decbd87b0ec77ba4f676bbb9b to your computer and use it in GitHub Desktop.
Save addam/d1aa628decbd87b0ec77ba4f676bbb9b to your computer and use it in GitHub Desktop.
Copy data from a posgre SQL server to another using asyncpg
#!/usr/bin/python3
import asyncio
import asyncpg
from time import time
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
pass # it will just be slower, no harm done
async def await_save(query, block, pool):
async with pool.acquire() as conn:
await conn.executemany(query, block)
async def process_db(reader_params, reader_query, writer_params, writer_query, block_size=100, block_count=10):
loop = asyncio.get_event_loop()
total_count = 0
reader = await asyncpg.connect(*reader_params)
writers = await asyncpg.create_pool(*writer_params)
pending = set()
async with reader.transaction():
cursor = await reader.cursor(reader_query)
while 1:
while len(pending) > block_count:
completed, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
total_count += len(completed)
if total_count % 100 < len(completed):
print(total_count * block_size, "rows")
block = await cursor.fetch(block_size)
if not block:
break
pending.add(loop.create_task(await_save(writer_query, block, writers)))
await asyncio.wait(pending)
params = {
"reader_params": {host="1.1.1.1", port=..., database=..., user=..., password=...},
"writer_params": {host="2.2.2.2", port=..., database=..., user=..., password=...},
"reader_query": "select column_a, column_b from table where column_c = 42;",
"writer_query": "insert into copied_table (col_a, col_b) values ($1, $2);",
}
loop = asyncio.get_event_loop()
loop.run_until_complete(process_db(**params))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment