Skip to content

Instantly share code, notes, and snippets.

@flolas
Last active January 12, 2021 02:05
Show Gist options
  • Save flolas/69790a8671b2e5d975435ae5dd8cf064 to your computer and use it in GitHub Desktop.
Save flolas/69790a8671b2e5d975435ae5dd8cf064 to your computer and use it in GitHub Desktop.
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator
start = datetime.combine(datetime.today() - timedelta(2), datetime.min.time())
default_args = {
'owner': 'flolas',
'start_date': start,
'email': ['felipe.lolas013@gmail.com'],
'retries': 5,
'retry_delay': timedelta(minutes=30),
# 'queue': 'bash_queue',
# 'priority_weight': 10,
#'end_date': datetime(2016, 1, 1),
}
dag = DAG('bteq-test', default_args=default_args, schedule_interval="@daily")
def loadBTEQ(path):
fd = open(path, 'r')
sqlFile = fd.read()
fd.close()
return sqlFile
t0 = DockerOperator(
task_id='BTEQ_NOMBRE',
image='bci/teradata:bteq',
api_version='1.21',
docker_url='161.131.194.95:2375',
command=loadBTEQ('./dags/logon.sql'),
xcom_all=True,
destroy_on_finish=True,
dag=dag
)
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory
from docker import Client, tls
import ast
class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.
A temporary directory is created on the host and mounted into a container to allow storing files
that together exceed the default disk size of 10GB in a container. The path to the mounted
directory can be accessed via the environment variable ``AIRFLOW_TMP_DIR``.
:param image: Docker image from which to create the container.
:type image: str
:param api_version: Remote API version.
:type api_version: str
:param command: Command to be run in the container.
:type command: str or list
:param cpus: Number of CPUs to assign to the container.
This value gets multiplied with 1024. See
https://docs.docker.com/engine/reference/run/#cpu-share-constraint
:type cpus: float
:param docker_url: URL of the host running the docker daemon.
:type docker_url: str
:param environment: Environment variables to set in the container.
:type environment: dict
:param force_pull: Pull the docker image on every run.
:type force_pull: bool
:param mem_limit: Maximum amount of memory the container can use. Either a float value, which
represents the limit in bytes, or a string like ``128m`` or ``1g``.
:type mem_limit: float or str
:param network_mode: Network mode for the container.
:type network_mode: str
:param tls_ca_cert: Path to a PEM-encoded certificate authority to secure the docker connection.
:type tls_ca_cert: str
:param tls_client_cert: Path to the PEM-encoded certificate used to authenticate docker client.
:type tls_client_cert: str
:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
:type tls_client_key: str
:param tls_hostname: Hostname to match against the docker server certificate or False to
disable the check.
:type tls_hostname: str or bool
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
:type tls_ssl_version: str
:param tmp_dir: Mount point inside the container to a temporary directory created on the host by
the operator. The path is also made available via the environment variable
``AIRFLOW_TMP_DIR`` inside the container.
:type tmp_dir: str
:param user: Default user inside the docker container.
:type user: int or str
:param volumes: List of volumes to mount into the container, e.g.
``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
:param xcom_push: Does the stdout will be pushed to the next step using XCom.
The default is False.
:type xcom_push: bool
:param xcom_all: Push all the stdout or just the last line. The default is False (last line).
:type xcom_all: bool
"""
template_fields = ('command',)
template_ext = ('.sh', '.bash',)
@apply_defaults
def __init__(
self,
image,
api_version=None,
command=None,
cpus=1.0,
docker_url='unix://var/run/docker.sock',
environment=None,
force_pull=False,
mem_limit=None,
network_mode=None,
tls_ca_cert=None,
tls_client_cert=None,
tls_client_key=None,
tls_hostname=None,
tls_ssl_version=None,
tmp_dir='/tmp/airflow',
user=None,
volumes=None,
xcom_push=False,
xcom_all=False,
destroy_on_finish=True,
*args,
**kwargs):
super(DockerOperator, self).__init__(*args, **kwargs)
self.api_version = api_version
self.command = command
self.cpus = cpus
self.docker_url = docker_url
self.environment = environment or {}
self.force_pull = force_pull
self.image = image
self.mem_limit = mem_limit
self.network_mode = network_mode
self.tls_ca_cert = tls_ca_cert
self.tls_client_cert = tls_client_cert
self.tls_client_key = tls_client_key
self.tls_hostname = tls_hostname
self.tls_ssl_version = tls_ssl_version
self.tmp_dir = tmp_dir
self.user = user
self.volumes = volumes or []
self.xcom_push = xcom_push
self.xcom_all = xcom_all
self.destroy_on_finish = destroy_on_finish
self.cli = None
self.container = None
def execute(self, context):
logging.info('Starting docker container from image ' + self.image)
tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
tls_config = tls.TLSConfig(
ca_cert=self.tls_ca_cert,
client_cert=(self.tls_client_cert, self.tls_client_key),
verify=True,
ssl_version=self.tls_ssl_version,
assert_hostname=self.tls_hostname
)
self.docker_url = self.docker_url.replace('tcp://', 'https://')
self.cli = Client(base_url=self.docker_url, version=self.api_version, tls=tls_config)
if ':' not in self.image:
image = self.image + ':latest'
else:
image = self.image
if self.force_pull or len(self.cli.images(name=image)) == 0:
logging.info('Pulling docker image ' + image)
for l in self.cli.pull(image, stream=True):
output = json.loads(l)
logging.info("{}".format(output['status']))
cpu_shares = int(round(self.cpus * 1024))
with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir:
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir))
self.container = self.cli.create_container(
command=self.get_command(),
cpu_shares=cpu_shares,
environment=self.environment,
host_config=self.cli.create_host_config(binds=self.volumes,
network_mode=self.network_mode),
image=image,
mem_limit=self.mem_limit,
user=self.user
)
self.cli.start(self.container['Id'])
line = ''
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))
exit_code = self.cli.wait(self.container['Id'])
if exit_code != 0:
raise AirflowException('docker container failed')
elif self.destroy_on_finish:
self.cli.remove_container(self.container['Id'])
if self.xcom_push:
return self.cli.logs(container=self.container['Id']) if self.xcom_all else str(line.strip())
def get_command(self):
if self.command is not None and self.command.strip().find('[') == 0:
commands = ast.literal_eval(self.command)
else:
commands = self.command
return commands
def on_kill(self):
if self.cli is not None:
logging.info('Stopping docker container')
self.cli.stop(self.container['Id'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment