Last active
February 13, 2026 18:22
-
-
Save Benjscho/7573e0e1e6b7cc574c384cd0492cbcb6 to your computer and use it in GitHub Desktop.
Testing DSQL Sequences
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # dependencies = ["aurora-dsql-python-connector[psycopg]", "psycopg-pool"] | |
| # /// | |
| """ | |
| Benchmark script to compare sequence performance in Aurora DSQL. | |
| Compares CACHE=1 vs CACHE=65536 for various workloads. | |
| Usage: | |
| uv run sequence_benchmark.py <cluster-endpoint> | |
| """ | |
| import argparse | |
| import concurrent.futures | |
| import re | |
| import time | |
| import aurora_dsql_psycopg as dsql | |
| from psycopg_pool import ConnectionPool | |
| from statistics import mean, stdev | |
| # Global connection pool | |
| _pool = None | |
| def parse_region_from_endpoint(endpoint): | |
| """Extract AWS region from DSQL cluster endpoint.""" | |
| # Endpoint format: <cluster-id>.dsql.<region>.on.aws | |
| match = re.search(r"\.dsql?\.([a-z0-9-]+)\.on\.aws$", endpoint) | |
| if match: | |
| return match.group(1) | |
| raise ValueError(f"Could not parse region from endpoint: {endpoint}") | |
| def init_pool(cluster_endpoint, region, pool_size=100): | |
| """Initialize the global connection pool.""" | |
| global _pool | |
| conn_params = { | |
| "host": cluster_endpoint, | |
| "region": region, | |
| "user": "admin", | |
| } | |
| _pool = ConnectionPool( | |
| "", | |
| connection_class=dsql.DSQLConnection, | |
| kwargs=conn_params, | |
| min_size=pool_size, | |
| max_size=pool_size, | |
| max_lifetime=3300, | |
| ) | |
| return _pool | |
| def get_pool(): | |
| """Get the global connection pool.""" | |
| if _pool is None: | |
| raise RuntimeError("Connection pool not initialized. Call init_pool() first.") | |
| return _pool | |
| def close_pool(): | |
| """Close the global connection pool.""" | |
| global _pool | |
| if _pool is not None: | |
| _pool.close() | |
| _pool = None | |
| def get_connection(cluster_endpoint, region): | |
| """Create a standalone connection to DSQL (not from pool).""" | |
| return dsql.connect( | |
| host=cluster_endpoint, | |
| region=region, | |
| user="admin", | |
| ) | |
| def run_ddl(conn, sql, retries=3): | |
| """Run a single DDL statement with commit and retry on OCC.""" | |
| for attempt in range(retries): | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute(sql) | |
| conn.commit() | |
| return | |
| except Exception as e: | |
| if "OC000" in str(e) and attempt < retries - 1: | |
| conn.rollback() | |
| time.sleep(0.1 * (attempt + 1)) | |
| else: | |
| raise | |
| def setup_sequences(conn): | |
| """Create test sequences.""" | |
| # DSQL requires separate transactions for each DDL | |
| run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_1") | |
| run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_65536") | |
| run_ddl(conn, "CREATE SEQUENCE seq_cache_1 CACHE 1") | |
| run_ddl(conn, "CREATE SEQUENCE seq_cache_65536 CACHE 65536") | |
| def benchmark_nextval(conn, sequence_name, iterations): | |
| """Benchmark nextval calls and return timing stats.""" | |
| times = [] | |
| with conn.cursor() as cur: | |
| for _ in range(iterations): | |
| start = time.perf_counter() | |
| cur.execute(f"SELECT nextval('{sequence_name}')") | |
| cur.fetchone() | |
| elapsed = time.perf_counter() - start | |
| times.append(elapsed * 1000) # Convert to ms | |
| return { | |
| "min": min(times), | |
| "max": max(times), | |
| "mean": mean(times), | |
| "stdev": stdev(times) if len(times) > 1 else 0, | |
| "total": sum(times), | |
| } | |
| def benchmark_uuid(conn, iterations): | |
| """Benchmark gen_random_uuid() calls and return timing stats.""" | |
| times = [] | |
| with conn.cursor() as cur: | |
| for _ in range(iterations): | |
| start = time.perf_counter() | |
| cur.execute("SELECT gen_random_uuid()") | |
| cur.fetchone() | |
| elapsed = time.perf_counter() - start | |
| times.append(elapsed * 1000) # Convert to ms | |
| return { | |
| "min": min(times), | |
| "max": max(times), | |
| "mean": mean(times), | |
| "stdev": stdev(times) if len(times) > 1 else 0, | |
| "total": sum(times), | |
| } | |
| def benchmark_bulk_insert(conn, sequence_name, rows): | |
| """Benchmark bulk insert using a sequence.""" | |
| run_ddl(conn, "DROP TABLE IF EXISTS bench_table") | |
| run_ddl(conn, "CREATE TABLE bench_table (id BIGINT, data TEXT)") | |
| start = time.perf_counter() | |
| with conn.cursor() as cur: | |
| cur.execute(f""" | |
| INSERT INTO bench_table (id, data) | |
| SELECT nextval('{sequence_name}'), 'row ' || g | |
| FROM generate_series(1, {rows}) g | |
| """) | |
| conn.commit() | |
| elapsed = time.perf_counter() - start | |
| return elapsed * 1000 # ms | |
| def benchmark_uuid_insert(conn, rows): | |
| """Benchmark bulk insert using UUIDs for comparison.""" | |
| run_ddl(conn, "DROP TABLE IF EXISTS bench_table_uuid") | |
| run_ddl(conn, "CREATE TABLE bench_table_uuid (id UUID, data TEXT)") | |
| start = time.perf_counter() | |
| with conn.cursor() as cur: | |
| cur.execute(f""" | |
| INSERT INTO bench_table_uuid (id, data) | |
| SELECT gen_random_uuid(), 'row ' || g | |
| FROM generate_series(1, {rows}) g | |
| """) | |
| conn.commit() | |
| elapsed = time.perf_counter() - start | |
| return elapsed * 1000 # ms | |
| def worker_nextval(sequence_name, iterations): | |
| """Worker function that gets a connection from the pool and calls nextval.""" | |
| pool = get_pool() | |
| times = [] | |
| errors = 0 | |
| with pool.connection() as conn: | |
| with conn.cursor() as cur: | |
| for _ in range(iterations): | |
| try: | |
| start = time.perf_counter() | |
| cur.execute(f"SELECT nextval('{sequence_name}')") | |
| cur.fetchone() | |
| elapsed = time.perf_counter() - start | |
| times.append(elapsed * 1000) | |
| except Exception: | |
| errors += 1 | |
| conn.rollback() | |
| return {"times": times, "errors": errors} | |
| def worker_uuid(iterations): | |
| """Worker function that gets a connection from the pool and generates UUIDs.""" | |
| pool = get_pool() | |
| times = [] | |
| errors = 0 | |
| with pool.connection() as conn: | |
| with conn.cursor() as cur: | |
| for _ in range(iterations): | |
| try: | |
| start = time.perf_counter() | |
| cur.execute("SELECT gen_random_uuid()") | |
| cur.fetchone() | |
| elapsed = time.perf_counter() - start | |
| times.append(elapsed * 1000) | |
| except Exception: | |
| errors += 1 | |
| conn.rollback() | |
| return {"times": times, "errors": errors} | |
| def benchmark_conflict(sequence_name, num_workers, iterations_per_worker): | |
| """Benchmark nextval with concurrent workers to test contention.""" | |
| all_times = [] | |
| total_errors = 0 | |
| start = time.perf_counter() | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
| futures = [ | |
| executor.submit(worker_nextval, sequence_name, iterations_per_worker) | |
| for _ in range(num_workers) | |
| ] | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| all_times.extend(result["times"]) | |
| total_errors += result["errors"] | |
| total_elapsed = time.perf_counter() - start | |
| return { | |
| "min": min(all_times) if all_times else 0, | |
| "max": max(all_times) if all_times else 0, | |
| "mean": mean(all_times) if all_times else 0, | |
| "stdev": stdev(all_times) if len(all_times) > 1 else 0, | |
| "total_time": total_elapsed * 1000, | |
| "total_calls": len(all_times), | |
| "errors": total_errors, | |
| "throughput": len(all_times) / total_elapsed if total_elapsed > 0 else 0, | |
| } | |
| def benchmark_conflict_uuid(num_workers, iterations_per_worker): | |
| """Benchmark UUID generation with concurrent workers.""" | |
| all_times = [] | |
| total_errors = 0 | |
| start = time.perf_counter() | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
| futures = [ | |
| executor.submit(worker_uuid, iterations_per_worker) | |
| for _ in range(num_workers) | |
| ] | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| all_times.extend(result["times"]) | |
| total_errors += result["errors"] | |
| total_elapsed = time.perf_counter() - start | |
| return { | |
| "min": min(all_times) if all_times else 0, | |
| "max": max(all_times) if all_times else 0, | |
| "mean": mean(all_times) if all_times else 0, | |
| "stdev": stdev(all_times) if len(all_times) > 1 else 0, | |
| "total_time": total_elapsed * 1000, | |
| "total_calls": len(all_times), | |
| "errors": total_errors, | |
| "throughput": len(all_times) / total_elapsed if total_elapsed > 0 else 0, | |
| } | |
| def run_experiments(cluster_endpoint, region): | |
| print("=" * 60) | |
| print("DSQL Sequence Performance Benchmark") | |
| print("=" * 60) | |
| print(f"Cluster: {cluster_endpoint}") | |
| print(f"Region: {region}") | |
| print() | |
| conn = get_connection(cluster_endpoint, region) | |
| print("Connected to DSQL\n") | |
| # Setup | |
| setup_sequences(conn) | |
| print() | |
| # Experiment 1: Individual nextval() calls | |
| print("-" * 60) | |
| print("Experiment 1: Individual nextval() calls (100 iterations)") | |
| print("-" * 60) | |
| results_cache1 = benchmark_nextval(conn, "seq_cache_1", 100) | |
| print(f"\nCACHE=1:") | |
| print(f" Mean: {results_cache1['mean']:.2f} ms") | |
| print(f" Min: {results_cache1['min']:.2f} ms") | |
| print(f" Max: {results_cache1['max']:.2f} ms") | |
| print(f" Total: {results_cache1['total']:.2f} ms") | |
| results_cache65536 = benchmark_nextval(conn, "seq_cache_65536", 100) | |
| print(f"\nCACHE=65536:") | |
| print(f" Mean: {results_cache65536['mean']:.2f} ms") | |
| print(f" Min: {results_cache65536['min']:.2f} ms") | |
| print(f" Max: {results_cache65536['max']:.2f} ms") | |
| print(f" Total: {results_cache65536['total']:.2f} ms") | |
| results_uuid = benchmark_uuid(conn, 100) | |
| print(f"\nUUID:") | |
| print(f" Mean: {results_uuid['mean']:.2f} ms") | |
| print(f" Min: {results_uuid['min']:.2f} ms") | |
| print(f" Max: {results_uuid['max']:.2f} ms") | |
| print(f" Total: {results_uuid['total']:.2f} ms") | |
| speedup = results_cache1["total"] / results_cache65536["total"] | |
| print(f"\nSpeedup with CACHE=65536 vs CACHE=1: {speedup:.1f}x faster") | |
| speedup_uuid = results_cache1["total"] / results_uuid["total"] | |
| print(f"Speedup with UUID vs CACHE=1: {speedup_uuid:.1f}x faster") | |
| # Experiment 2: Bulk insert with sequences | |
| print("\n" + "-" * 60) | |
| print("Experiment 2: Bulk INSERT with 1000 rows") | |
| print("-" * 60) | |
| # Reset sequences for clean test | |
| setup_sequences(conn) | |
| bulk_cache1 = benchmark_bulk_insert(conn, "seq_cache_1", 1000) | |
| print(f"\nCACHE=1: {bulk_cache1:.2f} ms") | |
| # Reset for cache=65536 test | |
| with conn.cursor() as cur: | |
| cur.execute("ALTER SEQUENCE seq_cache_65536 RESTART") | |
| conn.commit() | |
| bulk_cache65536 = benchmark_bulk_insert(conn, "seq_cache_65536", 1000) | |
| print(f"CACHE=65536: {bulk_cache65536:.2f} ms") | |
| bulk_uuid = benchmark_uuid_insert(conn, 1000) | |
| print(f"UUID: {bulk_uuid:.2f} ms") | |
| print( | |
| f"\nSequence CACHE=65536 vs CACHE=1: {bulk_cache1 / bulk_cache65536:.1f}x faster" | |
| ) | |
| print(f"UUID vs CACHE=1: {bulk_cache1 / bulk_uuid:.1f}x faster") | |
| # Experiment 3: Conflict test with concurrent workers | |
| print("\n" + "-" * 60) | |
| print("Experiment 3: Conflict test (100 workers x 100 nextvals each)") | |
| print("-" * 60) | |
| # Reset sequences for conflict test | |
| setup_sequences(conn) | |
| conn.close() # Close standalone connection | |
| # Initialize connection pool for conflict test | |
| print() | |
| init_pool(cluster_endpoint, region, pool_size=100) | |
| conflict_cache1 = benchmark_conflict( | |
| "seq_cache_1", num_workers=100, iterations_per_worker=100 | |
| ) | |
| print(f"\nCACHE=1:") | |
| print(f" Total time: {conflict_cache1['total_time']:.2f} ms") | |
| print(f" Throughput: {conflict_cache1['throughput']:.1f} calls/sec") | |
| print(f" Mean: {conflict_cache1['mean']:.2f} ms") | |
| print(f" Min: {conflict_cache1['min']:.2f} ms") | |
| print(f" Max: {conflict_cache1['max']:.2f} ms") | |
| print(f" Errors: {conflict_cache1['errors']}") | |
| conflict_cache65536 = benchmark_conflict( | |
| "seq_cache_65536", num_workers=100, iterations_per_worker=100 | |
| ) | |
| print(f"\nCACHE=65536:") | |
| print(f" Total time: {conflict_cache65536['total_time']:.2f} ms") | |
| print(f" Throughput: {conflict_cache65536['throughput']:.1f} calls/sec") | |
| print(f" Mean: {conflict_cache65536['mean']:.2f} ms") | |
| print(f" Min: {conflict_cache65536['min']:.2f} ms") | |
| print(f" Max: {conflict_cache65536['max']:.2f} ms") | |
| print(f" Errors: {conflict_cache65536['errors']}") | |
| conflict_uuid = benchmark_conflict_uuid(num_workers=100, iterations_per_worker=100) | |
| print(f"\nUUID:") | |
| print(f" Total time: {conflict_uuid['total_time']:.2f} ms") | |
| print(f" Throughput: {conflict_uuid['throughput']:.1f} calls/sec") | |
| print(f" Mean: {conflict_uuid['mean']:.2f} ms") | |
| print(f" Min: {conflict_uuid['min']:.2f} ms") | |
| print(f" Max: {conflict_uuid['max']:.2f} ms") | |
| print(f" Errors: {conflict_uuid['errors']}") | |
| throughput_speedup = ( | |
| conflict_cache65536["throughput"] / conflict_cache1["throughput"] | |
| if conflict_cache1["throughput"] > 0 | |
| else 0 | |
| ) | |
| throughput_speedup_uuid = ( | |
| conflict_uuid["throughput"] / conflict_cache1["throughput"] | |
| if conflict_cache1["throughput"] > 0 | |
| else 0 | |
| ) | |
| print( | |
| f"\nThroughput speedup with CACHE=65536 vs CACHE=1: {throughput_speedup:.1f}x" | |
| ) | |
| print(f"Throughput speedup with UUID vs CACHE=1: {throughput_speedup_uuid:.1f}x") | |
| # Close pool and reconnect for cleanup | |
| close_pool() | |
| conn = get_connection(cluster_endpoint, region) | |
| # Summary | |
| print("\n" + "=" * 60) | |
| print("SUMMARY") | |
| print("=" * 60) | |
| print(f""" | |
| For 1000-row bulk INSERT: | |
| CACHE=1: {bulk_cache1:.0f} ms | |
| CACHE=65536: {bulk_cache65536:.0f} ms ({bulk_cache1 / bulk_cache65536:.0f}x faster) | |
| UUID: {bulk_uuid:.0f} ms ({bulk_cache1 / bulk_uuid:.0f}x faster) | |
| Per-row cost with CACHE=1: {bulk_cache1 / 1000:.1f} ms/row | |
| Per-row cost with CACHE=65536: {bulk_cache65536 / 1000:.2f} ms/row | |
| For 100 concurrent workers (10,000 total calls each): | |
| CACHE=1: {conflict_cache1["throughput"]:.0f} calls/sec | |
| CACHE=65536: {conflict_cache65536["throughput"]:.0f} calls/sec ({throughput_speedup:.0f}x faster) | |
| UUID: {conflict_uuid["throughput"]:.0f} calls/sec ({throughput_speedup_uuid:.0f}x faster) | |
| """) | |
| # Cleanup | |
| print("\n" + "-" * 60) | |
| print("Cleanup") | |
| print("-" * 60) | |
| run_ddl(conn, "DROP TABLE IF EXISTS bench_table") | |
| run_ddl(conn, "DROP TABLE IF EXISTS bench_table_uuid") | |
| run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_1") | |
| run_ddl(conn, "DROP SEQUENCE IF EXISTS seq_cache_65536") | |
| print("Cleaned up test objects") | |
| conn.close() | |
| print("\nDone!") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description="Benchmark sequence performance in Aurora DSQL" | |
| ) | |
| parser.add_argument( | |
| "endpoint", help="DSQL cluster endpoint (e.g., abc123.dsql.us-west-2.on.aws)" | |
| ) | |
| parser.add_argument( | |
| "--region", help="AWS region (auto-detected from endpoint if not specified)" | |
| ) | |
| args = parser.parse_args() | |
| region = args.region or parse_region_from_endpoint(args.endpoint) | |
| run_experiments(args.endpoint, region) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment