Skip to content

Instantly share code, notes, and snippets.

@ludovicc
Created April 18, 2016 18:03
Show Gist options
  • Save ludovicc/bb753eb8d2a11e6b042d44a90965d6f5 to your computer and use it in GitHub Desktop.
Save ludovicc/bb753eb8d2a11e6b042d44a90965d6f5 to your computer and use it in GitHub Desktop.
Airflow: scan a directory for sub-directories ready for processing
[2016-04-18 19:43:45,764] {__init__.py:36} INFO - Using executor LocalExecutor
[2016-04-18 19:43:45,766] {__init__.py:36} INFO - Using executor LocalExecutor
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 15, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 203, in run
pool=args.pool,
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1075, in run
self.handle_failure(e, test_mode, context)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1142, in handle_failure
session.merge(self)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1709, in merge
load=load, _recursive=_recursive)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1752, in _merge
merged = self.query(mapper.class_).get(key[1])
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 831, in get
return self._get_impl(ident, loading.load_on_ident)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 864, in _get_impl
return fallback_fn(self, key)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 219, in load_on_ident
return q.one()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2693, in one
ret = list(self)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2736, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2749, in _execute_and_instances
close_with_result=True)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2740, in _connection_from_session
**kw)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 893, in connection
execution_options=execution_options)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 898, in _connection_for_bind
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 15, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 203, in run
engine, execution_options)
pool=args.pool,
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 313, in _connection_for_bind
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1075, in run
self._assert_active()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 214, in _assert_active
% self._rollback_exception
self.handle_failure(e, test_mode, context)
sqlalchemy.exc File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1142, in handle_failure
.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pre_process_dicom-2016-04-18 17:43:46' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, now(), now(), %s, %s, %s, %s, %s)'] [parameters: ('pre_process_dicom', None, u'running', 'PR01693_NO310167__2016-04-18T19:43:1461001426', 1, '\x80\x02}q\x01(U\x06folderq\x02U~/home/ludovic/tmp/For_Ludovic/Automatic_Computation_under_Organization/Sample_Data/For_Neuromorphics_Pipeline/PR01693_NO310167q\x03U\nsession_idq\x04U\x10PR01693_NO310167q\x05u.')]
session.merge(self)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1709, in merge
load=load, _recursive=_recursive)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1752, in _merge
merged = self.query(mapper.class_).get(key[1])
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 831, in get
return self._get_impl(ident, loading.load_on_ident)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 864, in _get_impl
return fallback_fn(self, key)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 219, in load_on_ident
return q.one()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2693, in one
ret = list(self)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2736, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2749, in _execute_and_instances
close_with_result=True)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2740, in _connection_from_session
**kw)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 893, in connection
execution_options=execution_options)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 898, in _connection_for_bind
engine, execution_options)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 313, in _connection_for_bind
self._assert_active()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 214, in _assert_active
% self._rollback_exception
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pre_process_dicom-2016-04-18 17:43:46' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, now(), now(), %s, %s, %s, %s, %s)'] [parameters: ('pre_process_dicom', None, u'running', 'PR01702_NN290760__2016-04-18T19:43:1461001426', 1, '\x80\x02}q\x01(U\x06folderq\x02U~/home/ludovic/tmp/For_Ludovic/Automatic_Computation_under_Organization/Sample_Data/For_Neuromorphics_Pipeline/PR01702_NN290760q\x03U\nsession_idq\x04U\x10PR01702_NN290760q\x05u.')]
[2016-04-18 19:43:49,022] {jobs.py:516} INFO - Prioritizing 0 queued jobs
import logging
import pre_process_dicom
import os
import copy
from datetime import datetime, timedelta, time
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable
# functions
def trigger_preprocessing(context, dag_run_obj):
if True:
session_id = context['params']['session_id']
logging.info('Trigger preprocessing for : %s', str(session_id))
# The payload will be available in target dag context as kwargs['dag_run'].conf
dag_run_obj.payload = context['params']
dag_run_obj.run_id = str(session_id + '__%s' % datetime.now().strftime("%Y-%m-%dT%H:%M:%s"))
return dag_run_obj
def scan_dirs_for_preprocessing(folder, **kwargs):
if not os.path.exists(folder):
os.makedirs(folder)
for fname in os.listdir(folder):
path = os.path.join(folder, fname)
if os.path.isdir(path):
ready_file_marker = os.path.join(path, '.ready')
proccessing_file_marker = os.path.join(path, '.processing')
if os.access(ready_file_marker, os.R_OK) and not os.access(proccessing_file_marker, os.R_OK):
logging.info('Prepare trigger for preprocessing : %s', str(fname))
context = copy.copy(kwargs)
context_params = context['params']
context_params['folder'] = path
context_params['session_id'] = fname
preprocessing_ingest = TriggerDagRunOperator(
# need to wrap task_id in str() because log_name returns as unicode
task_id=str('preprocess_ingest_%s' % fname),
trigger_dag_id=pre_process_dicom.DAG_NAME,
python_callable=trigger_preprocessing,
params={'folder': path, 'session_id': fname},
dag=dag
)
preprocessing_ingest.execute(context)
# Create .processing marker file in the folder marked for processing to avoid duplicate processing
open(proccessing_file_marker, 'a').close()
# constants
START = datetime.utcnow()
START = datetime.combine(START.date(), time(START.hour, 0))
# START = datetime.combine(datetime.today() - timedelta(days=2), datetime.min.time()) + timedelta(hours=10)
# START = datetime.now()
DAG_NAME = 'poll_pre_process_incoming'
# Define the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': START,
'retries': 1,
'retry_delay': timedelta(seconds=120),
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(dag_id=DAG_NAME,
default_args=default_args,
schedule_interval='* * * * *')
try:
preprocessing_data_folder = Variable.get("preprocessing_data_folder")
except:
preprocessing_data_folder = "/tmp/data/incoming"
scan_ready_dirs = PythonOperator(
task_id='scan_dirs_ready_for_preprocessing',
python_callable=scan_dirs_for_preprocessing,
op_args=[preprocessing_data_folder],
provide_context=True,
dag=dag)
scan_ready_dirs.doc_md = """\
# Scan directories ready for processing
Scan the session folders starting from the root folder %s (defined by variable __preprocessing_data_folder__).
It looks for the presence of a .ready marker file to mark that session folder as ready for processing, but it
will skip it if contains the marker file .processing indicating that processing has already started.
""" % preprocessing_data_folder
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment