Skip to content

Instantly share code, notes, and snippets.

@ishiis
Last active February 10, 2023 06:25
Show Gist options
  • Save ishiis/0881696aa7877f754c5a69f3145c95af to your computer and use it in GitHub Desktop.
Save ishiis/0881696aa7877f754c5a69f3145c95af to your computer and use it in GitHub Desktop.
Example Airflow Dag to export data from PostgreSQL through ssh tunnel.
from datetime import timedelta
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models.variable import Variable
REMOTE_HOST = Variable.get("ssh_host")
SSH_USER_NAME = Variable.get("ssh_user_name")
SSH_KEY_FILE = Variable.get("ssh_key_file")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'postgresql_through_ssh_tunnel',
default_args=default_args,
description='postgresql_through_ssh_tunnel',
schedule_interval='0 0 * * *',
start_date=pendulum.datetime(2021, 8, 1, 0, 0, 0, tz="Asia/Tokyo"),
catchup=False,
max_active_runs=1,
) as dag:
def _restart_tunnel():
ssh_hook = SSHHook(
ssh_conn_id='ssh_default',
keepalive_interval=60,
remote_host=REMOTE_HOST,
port=22,
username=SSH_USER_NAME,
key_file=SSH_KEY_FILE
)
tunnel = ssh_hook.get_tunnel(
remote_port=5432,
remote_host='127.0.0.1',
local_port=5432
)
tunnel.stop()
tunnel.start()
def _export(sql, export_file_name):
_restart_tunnel()
pg_hook = PostgresHook.get_hook("postgres_some_connection")
df = pg_hook.get_pandas_df(sql)
df.to_csv(export_file_name, index=False)
export_table = PythonOperator(
task_id="export_table",
python_callable=_export,
op_kwargs={
"sql": "SELECT * FROM users;",
"export_file_name": "/home/airflow/gcs/data/users.csv"
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment