Skip to content

Instantly share code, notes, and snippets.

@rahul-pande
Created September 5, 2016 07:48
Show Gist options
  • Save rahul-pande/e0df1b080e76563b47c9d2b1a6392855 to your computer and use it in GitHub Desktop.
Save rahul-pande/e0df1b080e76563b47c9d2b1a6392855 to your computer and use it in GitHub Desktop.
Airflow Postgres Download Operator
# Airflow Operator to download results of a sql query to a file on the worker
# Pass chunksize parameter to download large tables without the
# worker running out of memory
import logging
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class PgDownloadOperator(BaseOperator):
"""
Executes sql code in a specific Postgres database
and saves the result in given file name
:param postgres_conn_id: reference to a specific postgres database
:type postgres_conn_id: string
:param sql: the sql code to be executed
:type sql: Can receive a str representing a sql statement,
a list of str (sql statements), or reference to a template file.
Template reference are recognized by str ending in '.sql'
:param csv_path: absolute path to save the pandas df to, can be used with templating
:type csv_path: str
for example: /path/to/dag/storage/{{ task.task_id }}_{{ ds }}.csv
:param csv_params: params passed to the df.to_csv() function
:type csv_params: dict
:param pandas_sql_params: params passed to the
pandas.io.sql.read_sql() function
:type pandas_sql_params: dict
"""
template_fields = ('sql', 'csv_path',)
template_ext = ('.sql',)
ui_color = '#ededed'
@apply_defaults
def __init__(
self, sql,
csv_path,
postgres_conn_id='postgres_default', autocommit=False,
pandas_sql_params=None,
csv_params=None,
*args, **kwargs):
super(PgDownloadOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.postgres_conn_id = postgres_conn_id
self.csv_path = csv_path
self.autocommit = autocommit
self.pandas_sql_params = pandas_sql_params
self.csv_params = csv_params
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
pandas_df = self.hook.get_pandas_df(self.sql, parameters=self.pandas_sql_params)
logging.info('Saving to: ' + str(self.csv_path))
pandas_df.to_csv(self.csv_path, **self.csv_params)
do_something = PgDownloadOperator(
task_id='do_something',
postgres_conn_id='default_postgres',
sql = 'select * from {{ params.table_name }}',
params = {'table_name' : 'some_schema.some_table'},
# for downloading large tables
pandas_sql_params = {
'chunksize' : 100,
},
csv_path = '/path/to/dag/storage/{{ task.task_id }}_{{ ds }}.csv',
csv_params = {
'sep' : ',',
'index' : False,
},
depends_on_past=True,
dag=dag)
@antrix190
Copy link

Worked like a charm!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment