Skip to content

Instantly share code, notes, and snippets.

@evandiewald
Created September 30, 2022 15:24
Show Gist options
  • Save evandiewald/c8039d1c0618dad8d2072d4d51688995 to your computer and use it in GitHub Desktop.
Save evandiewald/c8039d1c0618dad8d2072d4d51688995 to your computer and use it in GitHub Desktop.
Utility to create a SQLAlchemy connection to a Postgres db via SSH tunnel
from dotenv import load_dotenv
from sshtunnel import SSHTunnelForwarder
from sqlalchemy.engine import create_engine, Engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from typing import Union
import os
from sqlalchemy.pool import NullPool
load_dotenv()
def connect(as_async=False) -> Union[Engine, AsyncEngine]:
try:
tunnel = SSHTunnelForwarder(
(os.getenv("ETL_ADDRESS"), 22),
ssh_username=os.getenv("ETL_USERNAME"),
ssh_pkey=os.getenv("SSH_PKEY_PATH"),
ssh_private_key_password=os.getenv("SSH_PKEY_PW"),
remote_bind_address=("127.0.0.1", 5432)
)
tunnel.start()
if as_async:
url = f'postgresql+asyncpg://{os.getenv("PG_USERNAME")}:{os.getenv("PG_PW")}@127.0.0.1:{tunnel.local_bind_port}/' \
f'{os.getenv("PG_DB")}'
engine = create_async_engine(url)
else:
url = f'postgresql://{os.getenv("PG_USERNAME")}:{os.getenv("PG_PW")}@127.0.0.1:{tunnel.local_bind_port}/' \
f'{os.getenv("PG_DB")}?sslmode=allow'
engine = create_engine(url)
return engine
except:
raise ConnectionError("Connection Failed")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment