Skip to content

Instantly share code, notes, and snippets.

@edthix
Created December 11, 2019 08:12
Show Gist options
  • Save edthix/8bcb0eb8415d01e4302640cddf57f2b6 to your computer and use it in GitHub Desktop.
Save edthix/8bcb0eb8415d01e4302640cddf57f2b6 to your computer and use it in GitHub Desktop.
Sample airflow dag for ssh tunnel + postgres (assuming both SERVER_ssh_connector and SERVER_ssh_postresql_tunnel_connector are available)
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.postgres_operator import PostgresOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 5,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='testing_postgres_tunnel_ssh',
default_args=default_args,
start_date=datetime(2019, 12, 1),
end_date=datetime(2019, 12, 30),
schedule_interval= timedelta(minutes=10) #'@daily'
)
REMOTE_BIND_IP = Variable.get('SERVER_REMOTE_BIND_IP')
REMOTE_BIND_PORT = Variable.get('SERVER_REMOTE_BIND_PORT')
LOCAL_BIND_PORT = Variable.get('SERVER_LOCAL_BIND_PORT')
ssh_hook = SSHHook(ssh_conn_id='SERVER_ssh_connector', keepalive_interval=60).get_tunnel(
int(REMOTE_BIND_PORT),
remote_host=REMOTE_BIND_IP,
local_port=int(LOCAL_BIND_PORT)
).start()
ssh_operator = SSHOperator(
ssh_hook=ssh_hook,
task_id='open_tunnel_to_SERVER',
command='ls -al',
dag=dag
)
postgres_operator = PostgresOperator(
postgres_conn_id='SERVER_ssh_postresql_tunnel_connector',
sql="select * from users limit 100",
task_id='get_users_from_SERVER_postgres_table',
dag=dag
)
ssh_operator >> postgres_operator
@MxMaster
Copy link

Hello. Thank you for the code. One question. Can I see the connection configurations for those two connections? Namely, SERVER_ssh_connector and SERVER_ssh_postresql_tunnel_connector?
Thank you very much.

@phifa
Copy link

phifa commented May 16, 2022

@edthix @MxMaster i need to do sth like this as well. I need to access a postgres db via ssh tunnel. I tried doing it this 🔼 way, but cannot get it to work. I am able to open a ssh connection, but I am unsure how to properly use get_tunnel() and wrap the postgres operator.

@jadsy2107
Copy link

Hey guys, I made an Kubernetes SSH Tunnel operator and dashboard , check it out may help
https://github.com/ngn-au/whiplash

@SasanAhmadi
Copy link

SasanAhmadi commented Dec 14, 2022

There are couple of things to tweek in here to make this work!
hook part should be something like the following:

ssh_hook = SSHHook(ssh_conn_id='SERVER_ssh_connector', keepalive_interval=60)
ssh_hook.get_tunnel(
    int(REMOTE_BIND_PORT),
    remote_host=REMOTE_BIND_IP,
    local_port=int(LOCAL_BIND_PORT)
).start()

# and then pass it to operator
ssh_operator = SSHOperator(
    ssh_hook=ssh_hook,
    task_id='open_tunnel_to_SERVER',
    command='ls -al',
    dag=dag
)

!! IMPORTANT thing to remember is the mapping of remote host and port to your local
when opening the tunnel you should specify remote_host as your postgres host and port to the port your postgres communicate with.
Then when creating connection in your postgres operator the connection should be to your locahost and the port.

So, what this setup does, it will create a tunnel to access the postgres_host:port from your localhost:port

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment