Created
September 30, 2022 15:24
-
-
Save evandiewald/c8039d1c0618dad8d2072d4d51688995 to your computer and use it in GitHub Desktop.
Utility to create a SQLAlchemy connection to a Postgres db via SSH tunnel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dotenv import load_dotenv | |
from sshtunnel import SSHTunnelForwarder | |
from sqlalchemy.engine import create_engine, Engine | |
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine | |
from typing import Union | |
import os | |
from sqlalchemy.pool import NullPool | |
load_dotenv() | |
def connect(as_async=False) -> Union[Engine, AsyncEngine]: | |
try: | |
tunnel = SSHTunnelForwarder( | |
(os.getenv("ETL_ADDRESS"), 22), | |
ssh_username=os.getenv("ETL_USERNAME"), | |
ssh_pkey=os.getenv("SSH_PKEY_PATH"), | |
ssh_private_key_password=os.getenv("SSH_PKEY_PW"), | |
remote_bind_address=("127.0.0.1", 5432) | |
) | |
tunnel.start() | |
if as_async: | |
url = f'postgresql+asyncpg://{os.getenv("PG_USERNAME")}:{os.getenv("PG_PW")}@127.0.0.1:{tunnel.local_bind_port}/' \ | |
f'{os.getenv("PG_DB")}' | |
engine = create_async_engine(url) | |
else: | |
url = f'postgresql://{os.getenv("PG_USERNAME")}:{os.getenv("PG_PW")}@127.0.0.1:{tunnel.local_bind_port}/' \ | |
f'{os.getenv("PG_DB")}?sslmode=allow' | |
engine = create_engine(url) | |
return engine | |
except: | |
raise ConnectionError("Connection Failed") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment