Created
March 22, 2023 16:10
-
-
Save ololobus/d93ad0bdfc2f56ae35bc8223820b6d0c to your computer and use it in GitHub Desktop.
Neon API latency test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import requests | |
import time | |
import contextlib | |
from pprint import pprint | |
import psycopg2 | |
from opentelemetry import trace | |
from opentelemetry.exporter.jaeger.thrift import JaegerExporter | |
from opentelemetry.sdk.resources import SERVICE_NAME, Resource | |
from opentelemetry.sdk.trace import TracerProvider | |
from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
trace.set_tracer_provider( | |
TracerProvider( | |
resource=Resource.create({SERVICE_NAME: "neon-api-tests"}) | |
) | |
) | |
tracer = trace.get_tracer(__name__) | |
# create a JaegerExporter | |
jaeger_exporter = JaegerExporter( | |
# configure agent | |
agent_host_name='localhost', | |
agent_port=6831, | |
# optional: configure also collector | |
# collector_endpoint='http://localhost:14268/api/traces?format=jaeger.thrift', | |
# username=xxxx, # optional | |
# password=xxxx, # optional | |
# max_tag_value_length=None # optional | |
) | |
# Create a BatchSpanProcessor and add the exporter to it | |
span_processor = BatchSpanProcessor(jaeger_exporter) | |
# add to the tracer | |
trace.get_tracer_provider().add_span_processor(span_processor) | |
API_BASE = "https://console.neon.tech/api/v2" | |
API_KEY = "xxx" | |
PROJECT_ID = "xxx-123" | |
def make_headers() -> dict: | |
return { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Bearer {API_KEY}" | |
} | |
def get_operation(pid: str, oid: str) -> dict: | |
url = f"{API_BASE}/projects/{pid}/operations/{oid}" | |
response = requests.get(url, headers=make_headers()) | |
return response.json() | |
def get_project(pid: str) -> dict: | |
url = f"{API_BASE}/projects/{pid}" | |
response = requests.get(url, headers=make_headers()) | |
return response.json()["project"] | |
def list_databases(pid: str, bid: str) -> list: | |
url = f"{API_BASE}/projects/{pid}/branches/{bid}/databases" | |
response = requests.get(url, headers=make_headers()) | |
return response.json()["databases"] | |
def create_branch(pid: str) -> dict: | |
url = f"{API_BASE}/projects/{pid}/branches" | |
payload = {"endpoints": [{"type": "read_write"}]} | |
response = requests.post(url, json=payload, headers=make_headers()) | |
return response.json() | |
def create_role(pid: str, bid: str, name: str) -> dict: | |
url = f"{API_BASE}/projects/{pid}/branches/{bid}/roles" | |
payload = {"role": {"name": name}} | |
response = requests.post(url, json=payload, headers=make_headers()) | |
return response.json() | |
def delete_branch(pid: str, bid: str) -> dict: | |
url = f"{API_BASE}/projects/{pid}/branches/{bid}" | |
response = requests.delete(url, headers=make_headers()) | |
return response.json() | |
def wait_operation_finished(pid: str, oid: str) -> None: | |
with tracer.start_as_current_span("wait_operation_finished"): | |
while True: | |
operation = get_operation(pid, oid) | |
if operation["operation"]["status"] == "finished": | |
break | |
time.sleep(0.1) | |
@contextlib.contextmanager | |
def proxy_query( | |
host=None, | |
**kwargs, | |
): | |
try: | |
conn = psycopg2.connect( | |
host=host, | |
sslmode="require", | |
connect_timeout=15, | |
**kwargs, | |
) | |
except Exception: | |
print(f"Cannot connect to {host}") | |
raise | |
else: | |
print(f"Successfully connected to {host}") | |
conn.autocommit = True | |
cur = conn.cursor() | |
try: | |
yield cur | |
finally: | |
conn.close() | |
def test_alter_user_latency(pid: str) -> None: | |
project = None | |
with tracer.start_as_current_span("get_project"): | |
project = get_project(pid) | |
print("Project") | |
pprint(project) | |
branch, endpoint = None, None | |
with tracer.start_as_current_span("create_branch"): | |
br_resp = create_branch(pid) | |
print("Branch created") | |
pprint(br_resp) | |
branch = br_resp["branch"] | |
endpoint = br_resp["endpoints"][0] | |
wait_operation_finished(pid, br_resp["operations"][-1]["id"]) | |
role = None | |
with tracer.start_as_current_span("create_role"): | |
role_resp = create_role(pid, branch["id"], "test_role") | |
role = role_resp["role"] | |
print("Role created") | |
pprint(role_resp) | |
wait_operation_finished(pid, role_resp["operations"][-1]["id"]) | |
database = None | |
with tracer.start_as_current_span("list_databases"): | |
dbs = list_databases(pid, branch["id"]) | |
database = dbs[0] | |
print("Databases") | |
pprint(database) | |
with tracer.start_as_current_span("proxy_query"): | |
with proxy_query( | |
host=endpoint["host"], | |
user=role["name"], | |
password=role["password"], | |
database=database["name"], | |
) as cur: | |
with tracer.start_as_current_span("alter_user"): | |
cur.execute("ALTER USER current_user SET search_path = 'public'") | |
with tracer.start_as_current_span("delete_branch"): | |
del_resp = delete_branch(pid, branch["id"]) | |
wait_operation_finished(pid, del_resp["operations"][-1]["id"]) | |
if __name__ == "__main__": | |
with tracer.start_as_current_span("test_alter_user_latency"): | |
test_alter_user_latency(PROJECT_ID) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment