Skip to content

Instantly share code, notes, and snippets.

@antweiss
Last active December 8, 2023 05:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save antweiss/a6716339983bcc93aa505fd0c620b013 to your computer and use it in GitHub Desktop.
Save antweiss/a6716339983bcc93aa505fd0c620b013 to your computer and use it in GitHub Desktop.
import logging
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
# Change these to your identifiers, if needed.
POSTGRES_CONN_ID = "postgres_default"
def pg_extract(copy_sql):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
logging.info("Exporting query to file")
pg_hook.copy_export(copy_sql, filename="/home/user/Airflow/data/customer.csv")
with DAG(
dag_id="pg_extract",
start_date=datetime.datetime(2022, 2, 2),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
t1 = PythonOperator(
task_id="pg_extract_task",
python_callable=pg_extract,
op_kwargs={
"copy_sql": "COPY (SELECT * FROM CUSTOMER WHERE first_name=’john’ ) TO STDOUT WITH CSV HEADER"
}
)
t1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment