Skip to content

Instantly share code, notes, and snippets.

@nave91
Last active February 27, 2019 04:23
Show Gist options
  • Save nave91/52670b5522969ea58e6bf15a100350c9 to your computer and use it in GitHub Desktop.
Save nave91/52670b5522969ea58e6bf15a100350c9 to your computer and use it in GitHub Desktop.
Containerized task framework to run ML models
# MIT License
# Copyright (c) 2019 Bellhops Inc.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.S3_hook import S3Hook
from airflow import utils as airflow_utils
class ContainerSubdagOperator(SubDagOperator):
@airflow_utils.apply_defaults
def __init__(self, dag, task_id, start_date, schedule_interval, default_args, containers_config, containers, **kwargs):
self.start_date = start_date
self.dag_schedule_interval = schedule_interval
self.default_args = default_args
self.containers_config = containers_config
self.containers = containers
self.tasks = {}
from airflow import DAG # circular import
self.sub_dag_name = dag.dag_id + '.' + task_id
self.subdag = DAG(
self.sub_dag_name,
start_date=self.start_date,
schedule_interval=self.dag_schedule_interval,
default_args=self.default_args
)
self.init_tasks()
super(ContainerSubdagOperator, self).__init__(
dag=dag,
task_id=task_id,
subdag=self.subdag
)
@property
def task_type(self):
return 'SubDagOperator'
def init_tasks(self):
for container in self.containers:
if 'run_branches' in container and container['run_branches']:
task = BranchSubdagOperator(
task_id=container['name'],
container=container,
containers_config=self.containers_config,
dag=self.subdag,
branch_names=container['branch_names'],
email=[container['owner']],
schedule_interval=self.dag_schedule_interval,
default_args=self.default_args,
start_date=self.start_date
)
self.tasks[container['name']] = task
else:
task = ContainerOperator(
container=container,
containers_config=self.containers_config,
dag=self.subdag,
email=[container['owner']]
)
self.tasks[container['name']] = task
for container in self.containers:
for dependency_name in container['depends_on']:
self.tasks[container['name']].set_upstream(self.tasks[dependency_name])
class BranchSubdagOperator(SubDagOperator):
@airflow_utils.apply_defaults
def __init__(self, dag, task_id, start_date, schedule_interval, default_args, containers_config, container, branch_names, **kwargs):
self.start_date = start_date
self.dag_schedule_interval = schedule_interval
self.default_args = default_args
self.containers_config = containers_config
self.container = container
self.branch_names = branch_names
self.tasks = {}
from airflow import DAG # circular import
self.sub_dag_name = dag.dag_id + '.' + task_id
self.subdag = DAG(
self.sub_dag_name,
start_date=self.start_date,
schedule_interval=self.dag_schedule_interval,
default_args=self.default_args
)
self.init_tasks()
super(BranchSubdagOperator, self).__init__(
dag=dag,
task_id=task_id,
subdag=self.subdag
)
@property
def task_type(self):
return 'SubDagOperator'
def init_tasks(self):
if 'run_branches' in self.container and self.container['run_branches']:
last_task_name = None
for branch_name in self.container['branch_names']:
task = ContainerOperator(
task_id=branch_name,
container=self.container,
containers_config=self.containers_config,
dag=self.subdag,
branch_name=branch_name,
email=[self.container['owner']]
)
self.tasks[branch_name] = task
if last_task_name is not None:
self.tasks[branch_name].set_upstream(self.tasks[last_task_name])
last_task_name = branch_name
class ContainerOperator(BashOperator):
@airflow_utils.apply_defaults
def __init__(self, dag, container, containers_config, branch_name=None, task_id=None, **kwargs):
self.containers_config = containers_config
self.branch_name = branch_name
shell_script_template = self.build_shell_script()
params = self.get_params(container=container)
task_id = task_id or container['name']
super(CommunionOperator, self).__init__(
dag=dag,
task_id=task_id,
bash_command=shell_script_template,
params=params,
email=[container['owner']]
)
def get_params(self, container):
params = {}
s3_conn = S3Hook(aws_conn_id='s3_conn')
aws_key, aws_pass, _, _ = s3_conn._get_credentials(None)
# Set defaults
params['run_command'] = 'python3 run.py'
params['env'] = {
'AWS_ACCESS_KEY_ID': aws_key,
'AWS_SECRET_ACCESS_KEY': aws_pass,
'S3_BUCKET': self.containers_config['S3_BUCKET'],
'S3_DIRECTORY': self.containers_config['S3_DIRECTORY'],
}
params['shared_directory'] = self.containers_config['SHARED_DIRECTORY']
if 'cli_options' in container:
params['cli_options'] = container['cli_options']
else:
params['cli_options'] = ''
if self.branch_name:
params['repo'] = ' -b {branch_name} {repo}'.format(branch_name=str(self.branch_name),
repo=container['repo'])
params['branch_name'] = self.branch_name
params['name'] = container['name'] + '/' + self.branch_name
else:
params['repo'] = container['repo']
params['name'] = container['name']
return params
@staticmethod
def build_shell_script():
'''
MAC:
echo DOCKER_HOST='host.docker.internal' >> .env
export DOCKER_HOST='host.docker.internal'
echo DOCKER_PORT=2377 >> .env
export DOCKER_PORT=2377
'''
shell_script_template = '''
rm -rf {{ params.shared_directory }}/{{ params.name }}
if [ ! -d {{ params.shared_directory }}/{{ params.name }} ]
then
mkdir -p {{ params.shared_directory }}/{{ params.name }}
fi
git clone {{ params.repo }} {{ params.shared_directory }}/{{ params.name }}
cd {{ params.shared_directory }}/{{ params.name }}
echo DOCKER_HOST=`/sbin/ip route|awk '/default/ { print $3 }'` >> .env
export DOCKER_HOST=`/sbin/ip route|awk '/default/ { print $3 }'`
echo DOCKER_PORT=2375 >> .env
export DOCKER_PORT=2375
{% for key, value in params.env.items() %}
echo "{{ key }}={{ value }}" >> .env
{% endfor %}
docker -H tcp://$DOCKER_HOST:$DOCKER_PORT build -t {{ params.name }} .
docker -H tcp://$DOCKER_HOST:$DOCKER_PORT run --env-file=.env {{ params.name }} {{ params.run_command }} {{ params.cli_options}}
docker -H tcp://$DOCKER_HOST:$DOCKER_PORT rm $(docker -H tcp://$DOCKER_HOST:$DOCKER_PORT ps -a | grep {{ params.name }} | cut -f1 -d' ')
'''
return shell_script_template
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment