Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Airflow Ftp CSV to SQL
Code that goes along with the Airflow tutorial located at:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.generic_transfer import GenericTransfer
from airflow.contrib.hooks import FTPHook
from airflow.hooks.mysql_hook import MySqlHook
from datetime import datetime, timedelta
import codecs
import os
import logging
five_days_ago = datetime.combine( - timedelta(5), datetime.min.time())
default_args = {
'owner': 'flolas',
'depends_on_past': True,
'start_date': five_days_ago,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
#'end_date': datetime(2016, 1, 1),
This function get a file from an FTP, then save it to a folder on worker fs
TODO: Delete blank file when file not found, then skip
def download_file_from_ftp(remote_path, local_path, prefix, suffix,conn_id, ext, **kwargs):
conn = FTPHook(ftp_conn_id=conn_id)
date = str(kwargs['execution_date'].day) + '-' + str(kwargs['execution_date'].month) + '-' + str(kwargs['execution_date'].year)
fl = prefix + date + suffix + '.' + ext
remote_filepath = remote_path + fl
local_filepath = local_path + fl'Getting file: {}'.format(remote_filepath))
conn.retrieve_file(remote_filepath, local_filepath)
return local_filepath
Expand columns process with pandas and save to the same csv
def process_file_from_fs(header=None, sep=',', decimal='.', **kwargs):
import pandas as pd
def unpack_col(col_to_unpack, df_to_append = None, header = 'col', sep=',', na_value=''):
unpacked_cols = col_to_unpack.fillna(na_value).apply(lambda x: pd.Series(x.split(','))).fillna(na_value)
#add dynamic columns names based on # of rows and parameter header passed for prefix (header_#)
col_names = []
for i in unpacked_cols.columns:
col_names.append(header + '_' + str(i))
unpacked_cols.columns = col_names
if isinstance(df_to_append, pd.DataFrame):
#return df concatenated with previously unpacked columns
return pd.concat([df_to_append, unpacked_cols], axis=1)
#return df only with unpacked columns
return unpacked_cols
# Esto lo que hace es recibir el output de la tarea anterior con xcom(local_filepath si se logro descargar el archivo)
local_filepath = kwargs['ti'].xcom_pull(task_ids='download_file')
df = pd.read_csv(local_filepath, sep = sep, header = header, decimal = '.', parse_dates=True, encoding='utf-8')
df = unpack_col(df[1], df, header='sent_mail')
df = unpack_col(df[2], df, header='recv_mail')
df.to_csv(local_filepath, sep='\t', encoding='utf-8')
return local_filepath
def bulk_load_sql(table_name, **kwargs):
local_filepath = kwargs['ti'].xcom_pull(task_ids='download_file')
conn = MySqlHook(conn_name_attr='ib_sql')
conn.bulk_load(table_name, local_filepath)
return table_name
dag = DAG('carga-mails-ejecutivos-6', default_args=default_args,schedule_interval="@daily")
t1 = PythonOperator(
op_kwargs={'remote_path': '/home/flolas/files/'
,'local_path': '/usr/local/airflow/files/'
,'ext': 'csv'
,'prefix': 'mail_'
,'suffix': ''
,'conn_id': 'ftp_servidor'
t2 = PythonOperator(
op_kwargs={'sep': '|'},
t3 = PythonOperator(
op_kwargs={'table_name': 'MailsEjecutivos'},
t1 >> t2
t2 >> t3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.