Skip to content

Instantly share code, notes, and snippets.

@praveev
Created April 28, 2016 20:45
Show Gist options
  • Save praveev/7b93b50746f8e965f7139ecba028490a to your computer and use it in GitHub Desktop.
Save praveev/7b93b50746f8e965f7139ecba028490a to your computer and use it in GitHub Desktop.
t1 = DistCpSensor(
task_id='sensor',
lookback=2,
distcp_dao=distcp_dao,
source_conn=source_conn,
parent_paths=['/tmp/testAirflowDistCp'],
timeout=12*60*60,
dag=dag)
def run_distcp_on_each(*args, **kwargs):
ti = kwargs['ti']
pprint(kwargs)
to_process = ti.xcom_pull(task_ids=None, key='to_process')
pprint(to_process)
for work in to_process:
t3 = DistCpOperator(
task_id='distcp_command',
source_conn=source_conn,
work=work,
dag=dag)
t4 = BashOperator(
task_id='run_distcp',
bash_command="{{ ti.xcom_pull(task_ids='distcp_command') }}",
xcom_push=True,
env=os.environ.copy(),
dag=dag)
t5 = BashOperator(
task_id='get_application_id',
bash_command="echo {{ ti.xcom_pull(task_ids='run_distcp') }} | awk '{print $NF}' | sed 's/job/application/g'",
xcom_push=True,
env=os.environ.copy(),
dag=dag)
t6 = DistCpMonitor(
task_id='monitor',
application_id="{{ ti.xcom_pull(task_ids='get_application_id') }}",
resource_manager_conn=resource_manager_conn,
dag=dag)
t7 = DistCpSensorCompletionOperator(
task_id='mysql_update',
distcp_dao=distcp_dao,
dag=dag)
t3.set_upstream(t2)
t4.set_upstream(t3)
t5.set_upstream(t4)
t6.set_upstream(t5)
t7.set_upstream(t6)
t2 = PythonOperator(
task_id='run_distcp_on_each',
provide_context=True,
python_callable=run_distcp_on_each,
dag=dag)
t2.set_upstream(t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment