Skip to content

Instantly share code, notes, and snippets.

@ololobus
Created March 22, 2023 16:10
Show Gist options
  • Save ololobus/d93ad0bdfc2f56ae35bc8223820b6d0c to your computer and use it in GitHub Desktop.
Save ololobus/d93ad0bdfc2f56ae35bc8223820b6d0c to your computer and use it in GitHub Desktop.
Neon API latency test
#!/usr/bin/env python3
import requests
import time
import contextlib
from pprint import pprint
import psycopg2
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({SERVICE_NAME: "neon-api-tests"})
)
)
tracer = trace.get_tracer(__name__)
# create a JaegerExporter
jaeger_exporter = JaegerExporter(
# configure agent
agent_host_name='localhost',
agent_port=6831,
# optional: configure also collector
# collector_endpoint='http://localhost:14268/api/traces?format=jaeger.thrift',
# username=xxxx, # optional
# password=xxxx, # optional
# max_tag_value_length=None # optional
)
# Create a BatchSpanProcessor and add the exporter to it
span_processor = BatchSpanProcessor(jaeger_exporter)
# add to the tracer
trace.get_tracer_provider().add_span_processor(span_processor)
API_BASE = "https://console.neon.tech/api/v2"
API_KEY = "xxx"
PROJECT_ID = "xxx-123"
def make_headers() -> dict:
return {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {API_KEY}"
}
def get_operation(pid: str, oid: str) -> dict:
url = f"{API_BASE}/projects/{pid}/operations/{oid}"
response = requests.get(url, headers=make_headers())
return response.json()
def get_project(pid: str) -> dict:
url = f"{API_BASE}/projects/{pid}"
response = requests.get(url, headers=make_headers())
return response.json()["project"]
def list_databases(pid: str, bid: str) -> list:
url = f"{API_BASE}/projects/{pid}/branches/{bid}/databases"
response = requests.get(url, headers=make_headers())
return response.json()["databases"]
def create_branch(pid: str) -> dict:
url = f"{API_BASE}/projects/{pid}/branches"
payload = {"endpoints": [{"type": "read_write"}]}
response = requests.post(url, json=payload, headers=make_headers())
return response.json()
def create_role(pid: str, bid: str, name: str) -> dict:
url = f"{API_BASE}/projects/{pid}/branches/{bid}/roles"
payload = {"role": {"name": name}}
response = requests.post(url, json=payload, headers=make_headers())
return response.json()
def delete_branch(pid: str, bid: str) -> dict:
url = f"{API_BASE}/projects/{pid}/branches/{bid}"
response = requests.delete(url, headers=make_headers())
return response.json()
def wait_operation_finished(pid: str, oid: str) -> None:
with tracer.start_as_current_span("wait_operation_finished"):
while True:
operation = get_operation(pid, oid)
if operation["operation"]["status"] == "finished":
break
time.sleep(0.1)
@contextlib.contextmanager
def proxy_query(
host=None,
**kwargs,
):
try:
conn = psycopg2.connect(
host=host,
sslmode="require",
connect_timeout=15,
**kwargs,
)
except Exception:
print(f"Cannot connect to {host}")
raise
else:
print(f"Successfully connected to {host}")
conn.autocommit = True
cur = conn.cursor()
try:
yield cur
finally:
conn.close()
def test_alter_user_latency(pid: str) -> None:
project = None
with tracer.start_as_current_span("get_project"):
project = get_project(pid)
print("Project")
pprint(project)
branch, endpoint = None, None
with tracer.start_as_current_span("create_branch"):
br_resp = create_branch(pid)
print("Branch created")
pprint(br_resp)
branch = br_resp["branch"]
endpoint = br_resp["endpoints"][0]
wait_operation_finished(pid, br_resp["operations"][-1]["id"])
role = None
with tracer.start_as_current_span("create_role"):
role_resp = create_role(pid, branch["id"], "test_role")
role = role_resp["role"]
print("Role created")
pprint(role_resp)
wait_operation_finished(pid, role_resp["operations"][-1]["id"])
database = None
with tracer.start_as_current_span("list_databases"):
dbs = list_databases(pid, branch["id"])
database = dbs[0]
print("Databases")
pprint(database)
with tracer.start_as_current_span("proxy_query"):
with proxy_query(
host=endpoint["host"],
user=role["name"],
password=role["password"],
database=database["name"],
) as cur:
with tracer.start_as_current_span("alter_user"):
cur.execute("ALTER USER current_user SET search_path = 'public'")
with tracer.start_as_current_span("delete_branch"):
del_resp = delete_branch(pid, branch["id"])
wait_operation_finished(pid, del_resp["operations"][-1]["id"])
if __name__ == "__main__":
with tracer.start_as_current_span("test_alter_user_latency"):
test_alter_user_latency(PROJECT_ID)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment