Created
April 18, 2016 18:03
-
-
Save ludovicc/bb753eb8d2a11e6b042d44a90965d6f5 to your computer and use it in GitHub Desktop.
Airflow: scan a directory for sub-directories ready for processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[2016-04-18 19:43:45,764] {__init__.py:36} INFO - Using executor LocalExecutor | |
[2016-04-18 19:43:45,766] {__init__.py:36} INFO - Using executor LocalExecutor | |
Traceback (most recent call last): | |
File "/usr/local/bin/airflow", line 15, in <module> | |
args.func(args) | |
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 203, in run | |
pool=args.pool, | |
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1075, in run | |
self.handle_failure(e, test_mode, context) | |
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1142, in handle_failure | |
session.merge(self) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1709, in merge | |
load=load, _recursive=_recursive) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1752, in _merge | |
merged = self.query(mapper.class_).get(key[1]) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 831, in get | |
return self._get_impl(ident, loading.load_on_ident) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 864, in _get_impl | |
return fallback_fn(self, key) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 219, in load_on_ident | |
return q.one() | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2693, in one | |
ret = list(self) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2736, in __iter__ | |
return self._execute_and_instances(context) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2749, in _execute_and_instances | |
close_with_result=True) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2740, in _connection_from_session | |
**kw) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 893, in connection | |
execution_options=execution_options) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 898, in _connection_for_bind | |
Traceback (most recent call last): | |
File "/usr/local/bin/airflow", line 15, in <module> | |
args.func(args) | |
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 203, in run | |
engine, execution_options) | |
pool=args.pool, | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 313, in _connection_for_bind | |
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1075, in run | |
self._assert_active() | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 214, in _assert_active | |
% self._rollback_exception | |
self.handle_failure(e, test_mode, context) | |
sqlalchemy.exc File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1142, in handle_failure | |
.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pre_process_dicom-2016-04-18 17:43:46' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, now(), now(), %s, %s, %s, %s, %s)'] [parameters: ('pre_process_dicom', None, u'running', 'PR01693_NO310167__2016-04-18T19:43:1461001426', 1, '\x80\x02}q\x01(U\x06folderq\x02U~/home/ludovic/tmp/For_Ludovic/Automatic_Computation_under_Organization/Sample_Data/For_Neuromorphics_Pipeline/PR01693_NO310167q\x03U\nsession_idq\x04U\x10PR01693_NO310167q\x05u.')] | |
session.merge(self) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1709, in merge | |
load=load, _recursive=_recursive) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1752, in _merge | |
merged = self.query(mapper.class_).get(key[1]) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 831, in get | |
return self._get_impl(ident, loading.load_on_ident) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 864, in _get_impl | |
return fallback_fn(self, key) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 219, in load_on_ident | |
return q.one() | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2693, in one | |
ret = list(self) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2736, in __iter__ | |
return self._execute_and_instances(context) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2749, in _execute_and_instances | |
close_with_result=True) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2740, in _connection_from_session | |
**kw) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 893, in connection | |
execution_options=execution_options) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 898, in _connection_for_bind | |
engine, execution_options) | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 313, in _connection_for_bind | |
self._assert_active() | |
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 214, in _assert_active | |
% self._rollback_exception | |
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pre_process_dicom-2016-04-18 17:43:46' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, now(), now(), %s, %s, %s, %s, %s)'] [parameters: ('pre_process_dicom', None, u'running', 'PR01702_NN290760__2016-04-18T19:43:1461001426', 1, '\x80\x02}q\x01(U\x06folderq\x02U~/home/ludovic/tmp/For_Ludovic/Automatic_Computation_under_Organization/Sample_Data/For_Neuromorphics_Pipeline/PR01702_NN290760q\x03U\nsession_idq\x04U\x10PR01702_NN290760q\x05u.')] | |
[2016-04-18 19:43:49,022] {jobs.py:516} INFO - Prioritizing 0 queued jobs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import pre_process_dicom | |
import os | |
import copy | |
from datetime import datetime, timedelta, time | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.operators.dagrun_operator import TriggerDagRunOperator | |
from airflow.models import Variable | |
# functions | |
def trigger_preprocessing(context, dag_run_obj): | |
if True: | |
session_id = context['params']['session_id'] | |
logging.info('Trigger preprocessing for : %s', str(session_id)) | |
# The payload will be available in target dag context as kwargs['dag_run'].conf | |
dag_run_obj.payload = context['params'] | |
dag_run_obj.run_id = str(session_id + '__%s' % datetime.now().strftime("%Y-%m-%dT%H:%M:%s")) | |
return dag_run_obj | |
def scan_dirs_for_preprocessing(folder, **kwargs): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for fname in os.listdir(folder): | |
path = os.path.join(folder, fname) | |
if os.path.isdir(path): | |
ready_file_marker = os.path.join(path, '.ready') | |
proccessing_file_marker = os.path.join(path, '.processing') | |
if os.access(ready_file_marker, os.R_OK) and not os.access(proccessing_file_marker, os.R_OK): | |
logging.info('Prepare trigger for preprocessing : %s', str(fname)) | |
context = copy.copy(kwargs) | |
context_params = context['params'] | |
context_params['folder'] = path | |
context_params['session_id'] = fname | |
preprocessing_ingest = TriggerDagRunOperator( | |
# need to wrap task_id in str() because log_name returns as unicode | |
task_id=str('preprocess_ingest_%s' % fname), | |
trigger_dag_id=pre_process_dicom.DAG_NAME, | |
python_callable=trigger_preprocessing, | |
params={'folder': path, 'session_id': fname}, | |
dag=dag | |
) | |
preprocessing_ingest.execute(context) | |
# Create .processing marker file in the folder marked for processing to avoid duplicate processing | |
open(proccessing_file_marker, 'a').close() | |
# constants | |
START = datetime.utcnow() | |
START = datetime.combine(START.date(), time(START.hour, 0)) | |
# START = datetime.combine(datetime.today() - timedelta(days=2), datetime.min.time()) + timedelta(hours=10) | |
# START = datetime.now() | |
DAG_NAME = 'poll_pre_process_incoming' | |
# Define the DAG | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': START, | |
'retries': 1, | |
'retry_delay': timedelta(seconds=120), | |
'email_on_failure': True, | |
'email_on_retry': True | |
} | |
dag = DAG(dag_id=DAG_NAME, | |
default_args=default_args, | |
schedule_interval='* * * * *') | |
try: | |
preprocessing_data_folder = Variable.get("preprocessing_data_folder") | |
except: | |
preprocessing_data_folder = "/tmp/data/incoming" | |
scan_ready_dirs = PythonOperator( | |
task_id='scan_dirs_ready_for_preprocessing', | |
python_callable=scan_dirs_for_preprocessing, | |
op_args=[preprocessing_data_folder], | |
provide_context=True, | |
dag=dag) | |
scan_ready_dirs.doc_md = """\ | |
# Scan directories ready for processing | |
Scan the session folders starting from the root folder %s (defined by variable __preprocessing_data_folder__). | |
It looks for the presence of a .ready marker file to mark that session folder as ready for processing, but it | |
will skip it if contains the marker file .processing indicating that processing has already started. | |
""" % preprocessing_data_folder |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment