Skip to content

Instantly share code, notes, and snippets.

@Benjscho
Last active February 13, 2026 18:22
Show Gist options
  • Select an option

  • Save Benjscho/7573e0e1e6b7cc574c384cd0492cbcb6 to your computer and use it in GitHub Desktop.

Select an option

Save Benjscho/7573e0e1e6b7cc574c384cd0492cbcb6 to your computer and use it in GitHub Desktop.
Testing DSQL Sequences
#!/usr/bin/env python3
# /// script
# dependencies = ["aurora-dsql-python-connector[psycopg]", "psycopg-pool"]
# ///
"""
Benchmark script to compare sequence performance in Aurora DSQL.
Compares CACHE=1 vs CACHE=65536 for various workloads.
Usage:
uv run sequence_benchmark.py <cluster-endpoint>
"""
import argparse
import concurrent.futures
import re
import time
import aurora_dsql_psycopg as dsql
from psycopg_pool import ConnectionPool
from statistics import mean, stdev
# Global connection pool
_pool = None
def parse_region_from_endpoint(endpoint):
"""Extract AWS region from DSQL cluster endpoint."""
# Endpoint format: <cluster-id>.dsql.<region>.on.aws
match = re.search(r"\.dsql?\.([a-z0-9-]+)\.on\.aws$", endpoint)
if match:
return match.group(1)
raise ValueError(f"Could not parse region from endpoint: {endpoint}")
def init_pool(cluster_endpoint, region, pool_size=100):
"""Initialize the global connection pool."""
global _pool
conn_params = {
"host": cluster_endpoint,
"region": region,
"user": "admin",
}
_pool = ConnectionPool(
"",
connection_class=dsql.DSQLConnection,
kwargs=conn_params,
min_size=pool_size,
max_size=pool_size,
max_lifetime=3300,
)
return _pool
def get_pool():
"""Get the global connection pool."""
if _pool is None:
raise RuntimeError("Connection pool not initialized. Call init_pool() first.")
return _pool
def close_pool():
"""Close the global connection pool."""
global _pool
if _pool is not None:
_pool.close()
_pool = None
def get_connection(cluster_endpoint, region):
"""Create a standalone connection to DSQL (not from pool)."""
return dsql.connect(
host=cluster_endpoint,
region=region,
user="admin",
)
def run_ddl(conn, sql, retries=3):
"""Run a single DDL statement with commit and retry on OCC."""
for attempt in range(retries):
try:
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
return
except Exception as e:
if "OC000" in str(e) and attempt < retries - 1:
conn.rollback()
time.sleep(0.1 * (attempt + 1))
else:
raise
def setup_sequences(conn):
"""Create test sequences."""
# DSQL requires separate transactions for each DDL
run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_1")
run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_65536")
run_ddl(conn, "CREATE SEQUENCE seq_cache_1 CACHE 1")
run_ddl(conn, "CREATE SEQUENCE seq_cache_65536 CACHE 65536")
def benchmark_nextval(conn, sequence_name, iterations):
"""Benchmark nextval calls and return timing stats."""
times = []
with conn.cursor() as cur:
for _ in range(iterations):
start = time.perf_counter()
cur.execute(f"SELECT nextval('{sequence_name}')")
cur.fetchone()
elapsed = time.perf_counter() - start
times.append(elapsed * 1000) # Convert to ms
return {
"min": min(times),
"max": max(times),
"mean": mean(times),
"stdev": stdev(times) if len(times) > 1 else 0,
"total": sum(times),
}
def benchmark_uuid(conn, iterations):
"""Benchmark gen_random_uuid() calls and return timing stats."""
times = []
with conn.cursor() as cur:
for _ in range(iterations):
start = time.perf_counter()
cur.execute("SELECT gen_random_uuid()")
cur.fetchone()
elapsed = time.perf_counter() - start
times.append(elapsed * 1000) # Convert to ms
return {
"min": min(times),
"max": max(times),
"mean": mean(times),
"stdev": stdev(times) if len(times) > 1 else 0,
"total": sum(times),
}
def benchmark_bulk_insert(conn, sequence_name, rows):
"""Benchmark bulk insert using a sequence."""
run_ddl(conn, "DROP TABLE IF EXISTS bench_table")
run_ddl(conn, "CREATE TABLE bench_table (id BIGINT, data TEXT)")
start = time.perf_counter()
with conn.cursor() as cur:
cur.execute(f"""
INSERT INTO bench_table (id, data)
SELECT nextval('{sequence_name}'), 'row ' || g
FROM generate_series(1, {rows}) g
""")
conn.commit()
elapsed = time.perf_counter() - start
return elapsed * 1000 # ms
def benchmark_uuid_insert(conn, rows):
"""Benchmark bulk insert using UUIDs for comparison."""
run_ddl(conn, "DROP TABLE IF EXISTS bench_table_uuid")
run_ddl(conn, "CREATE TABLE bench_table_uuid (id UUID, data TEXT)")
start = time.perf_counter()
with conn.cursor() as cur:
cur.execute(f"""
INSERT INTO bench_table_uuid (id, data)
SELECT gen_random_uuid(), 'row ' || g
FROM generate_series(1, {rows}) g
""")
conn.commit()
elapsed = time.perf_counter() - start
return elapsed * 1000 # ms
def worker_nextval(sequence_name, iterations):
"""Worker function that gets a connection from the pool and calls nextval."""
pool = get_pool()
times = []
errors = 0
with pool.connection() as conn:
with conn.cursor() as cur:
for _ in range(iterations):
try:
start = time.perf_counter()
cur.execute(f"SELECT nextval('{sequence_name}')")
cur.fetchone()
elapsed = time.perf_counter() - start
times.append(elapsed * 1000)
except Exception:
errors += 1
conn.rollback()
return {"times": times, "errors": errors}
def worker_uuid(iterations):
"""Worker function that gets a connection from the pool and generates UUIDs."""
pool = get_pool()
times = []
errors = 0
with pool.connection() as conn:
with conn.cursor() as cur:
for _ in range(iterations):
try:
start = time.perf_counter()
cur.execute("SELECT gen_random_uuid()")
cur.fetchone()
elapsed = time.perf_counter() - start
times.append(elapsed * 1000)
except Exception:
errors += 1
conn.rollback()
return {"times": times, "errors": errors}
def benchmark_conflict(sequence_name, num_workers, iterations_per_worker):
"""Benchmark nextval with concurrent workers to test contention."""
all_times = []
total_errors = 0
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [
executor.submit(worker_nextval, sequence_name, iterations_per_worker)
for _ in range(num_workers)
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
all_times.extend(result["times"])
total_errors += result["errors"]
total_elapsed = time.perf_counter() - start
return {
"min": min(all_times) if all_times else 0,
"max": max(all_times) if all_times else 0,
"mean": mean(all_times) if all_times else 0,
"stdev": stdev(all_times) if len(all_times) > 1 else 0,
"total_time": total_elapsed * 1000,
"total_calls": len(all_times),
"errors": total_errors,
"throughput": len(all_times) / total_elapsed if total_elapsed > 0 else 0,
}
def benchmark_conflict_uuid(num_workers, iterations_per_worker):
"""Benchmark UUID generation with concurrent workers."""
all_times = []
total_errors = 0
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [
executor.submit(worker_uuid, iterations_per_worker)
for _ in range(num_workers)
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
all_times.extend(result["times"])
total_errors += result["errors"]
total_elapsed = time.perf_counter() - start
return {
"min": min(all_times) if all_times else 0,
"max": max(all_times) if all_times else 0,
"mean": mean(all_times) if all_times else 0,
"stdev": stdev(all_times) if len(all_times) > 1 else 0,
"total_time": total_elapsed * 1000,
"total_calls": len(all_times),
"errors": total_errors,
"throughput": len(all_times) / total_elapsed if total_elapsed > 0 else 0,
}
def run_experiments(cluster_endpoint, region):
print("=" * 60)
print("DSQL Sequence Performance Benchmark")
print("=" * 60)
print(f"Cluster: {cluster_endpoint}")
print(f"Region: {region}")
print()
conn = get_connection(cluster_endpoint, region)
print("Connected to DSQL\n")
# Setup
setup_sequences(conn)
print()
# Experiment 1: Individual nextval() calls
print("-" * 60)
print("Experiment 1: Individual nextval() calls (100 iterations)")
print("-" * 60)
results_cache1 = benchmark_nextval(conn, "seq_cache_1", 100)
print(f"\nCACHE=1:")
print(f" Mean: {results_cache1['mean']:.2f} ms")
print(f" Min: {results_cache1['min']:.2f} ms")
print(f" Max: {results_cache1['max']:.2f} ms")
print(f" Total: {results_cache1['total']:.2f} ms")
results_cache65536 = benchmark_nextval(conn, "seq_cache_65536", 100)
print(f"\nCACHE=65536:")
print(f" Mean: {results_cache65536['mean']:.2f} ms")
print(f" Min: {results_cache65536['min']:.2f} ms")
print(f" Max: {results_cache65536['max']:.2f} ms")
print(f" Total: {results_cache65536['total']:.2f} ms")
results_uuid = benchmark_uuid(conn, 100)
print(f"\nUUID:")
print(f" Mean: {results_uuid['mean']:.2f} ms")
print(f" Min: {results_uuid['min']:.2f} ms")
print(f" Max: {results_uuid['max']:.2f} ms")
print(f" Total: {results_uuid['total']:.2f} ms")
speedup = results_cache1["total"] / results_cache65536["total"]
print(f"\nSpeedup with CACHE=65536 vs CACHE=1: {speedup:.1f}x faster")
speedup_uuid = results_cache1["total"] / results_uuid["total"]
print(f"Speedup with UUID vs CACHE=1: {speedup_uuid:.1f}x faster")
# Experiment 2: Bulk insert with sequences
print("\n" + "-" * 60)
print("Experiment 2: Bulk INSERT with 1000 rows")
print("-" * 60)
# Reset sequences for clean test
setup_sequences(conn)
bulk_cache1 = benchmark_bulk_insert(conn, "seq_cache_1", 1000)
print(f"\nCACHE=1: {bulk_cache1:.2f} ms")
# Reset for cache=65536 test
with conn.cursor() as cur:
cur.execute("ALTER SEQUENCE seq_cache_65536 RESTART")
conn.commit()
bulk_cache65536 = benchmark_bulk_insert(conn, "seq_cache_65536", 1000)
print(f"CACHE=65536: {bulk_cache65536:.2f} ms")
bulk_uuid = benchmark_uuid_insert(conn, 1000)
print(f"UUID: {bulk_uuid:.2f} ms")
print(
f"\nSequence CACHE=65536 vs CACHE=1: {bulk_cache1 / bulk_cache65536:.1f}x faster"
)
print(f"UUID vs CACHE=1: {bulk_cache1 / bulk_uuid:.1f}x faster")
# Experiment 3: Conflict test with concurrent workers
print("\n" + "-" * 60)
print("Experiment 3: Conflict test (100 workers x 100 nextvals each)")
print("-" * 60)
# Reset sequences for conflict test
setup_sequences(conn)
conn.close() # Close standalone connection
# Initialize connection pool for conflict test
print()
init_pool(cluster_endpoint, region, pool_size=100)
conflict_cache1 = benchmark_conflict(
"seq_cache_1", num_workers=100, iterations_per_worker=100
)
print(f"\nCACHE=1:")
print(f" Total time: {conflict_cache1['total_time']:.2f} ms")
print(f" Throughput: {conflict_cache1['throughput']:.1f} calls/sec")
print(f" Mean: {conflict_cache1['mean']:.2f} ms")
print(f" Min: {conflict_cache1['min']:.2f} ms")
print(f" Max: {conflict_cache1['max']:.2f} ms")
print(f" Errors: {conflict_cache1['errors']}")
conflict_cache65536 = benchmark_conflict(
"seq_cache_65536", num_workers=100, iterations_per_worker=100
)
print(f"\nCACHE=65536:")
print(f" Total time: {conflict_cache65536['total_time']:.2f} ms")
print(f" Throughput: {conflict_cache65536['throughput']:.1f} calls/sec")
print(f" Mean: {conflict_cache65536['mean']:.2f} ms")
print(f" Min: {conflict_cache65536['min']:.2f} ms")
print(f" Max: {conflict_cache65536['max']:.2f} ms")
print(f" Errors: {conflict_cache65536['errors']}")
conflict_uuid = benchmark_conflict_uuid(num_workers=100, iterations_per_worker=100)
print(f"\nUUID:")
print(f" Total time: {conflict_uuid['total_time']:.2f} ms")
print(f" Throughput: {conflict_uuid['throughput']:.1f} calls/sec")
print(f" Mean: {conflict_uuid['mean']:.2f} ms")
print(f" Min: {conflict_uuid['min']:.2f} ms")
print(f" Max: {conflict_uuid['max']:.2f} ms")
print(f" Errors: {conflict_uuid['errors']}")
throughput_speedup = (
conflict_cache65536["throughput"] / conflict_cache1["throughput"]
if conflict_cache1["throughput"] > 0
else 0
)
throughput_speedup_uuid = (
conflict_uuid["throughput"] / conflict_cache1["throughput"]
if conflict_cache1["throughput"] > 0
else 0
)
print(
f"\nThroughput speedup with CACHE=65536 vs CACHE=1: {throughput_speedup:.1f}x"
)
print(f"Throughput speedup with UUID vs CACHE=1: {throughput_speedup_uuid:.1f}x")
# Close pool and reconnect for cleanup
close_pool()
conn = get_connection(cluster_endpoint, region)
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"""
For 1000-row bulk INSERT:
CACHE=1: {bulk_cache1:.0f} ms
CACHE=65536: {bulk_cache65536:.0f} ms ({bulk_cache1 / bulk_cache65536:.0f}x faster)
UUID: {bulk_uuid:.0f} ms ({bulk_cache1 / bulk_uuid:.0f}x faster)
Per-row cost with CACHE=1: {bulk_cache1 / 1000:.1f} ms/row
Per-row cost with CACHE=65536: {bulk_cache65536 / 1000:.2f} ms/row
For 100 concurrent workers (10,000 total calls each):
CACHE=1: {conflict_cache1["throughput"]:.0f} calls/sec
CACHE=65536: {conflict_cache65536["throughput"]:.0f} calls/sec ({throughput_speedup:.0f}x faster)
UUID: {conflict_uuid["throughput"]:.0f} calls/sec ({throughput_speedup_uuid:.0f}x faster)
""")
# Cleanup
print("\n" + "-" * 60)
print("Cleanup")
print("-" * 60)
run_ddl(conn, "DROP TABLE IF EXISTS bench_table")
run_ddl(conn, "DROP TABLE IF EXISTS bench_table_uuid")
run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_1")
run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_65536")
print("Cleaned up test objects")
conn.close()
print("\nDone!")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Benchmark sequence performance in Aurora DSQL"
)
parser.add_argument(
"endpoint", help="DSQL cluster endpoint (e.g., abc123.dsql.us-west-2.on.aws)"
)
parser.add_argument(
"--region", help="AWS region (auto-detected from endpoint if not specified)"
)
args = parser.parse_args()
region = args.region or parse_region_from_endpoint(args.endpoint)
run_experiments(args.endpoint, region)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment