Last active
January 12, 2021 02:05
-
-
Save flolas/69790a8671b2e5d975435ae5dd8cf064 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Code that goes along with the Airflow tutorial located at: | |
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py | |
""" | |
from airflow import DAG | |
from datetime import datetime, timedelta | |
from airflow.operators.docker_operator import DockerOperator | |
start = datetime.combine(datetime.today() - timedelta(2), datetime.min.time()) | |
default_args = { | |
'owner': 'flolas', | |
'start_date': start, | |
'email': ['felipe.lolas013@gmail.com'], | |
'retries': 5, | |
'retry_delay': timedelta(minutes=30), | |
# 'queue': 'bash_queue', | |
# 'priority_weight': 10, | |
#'end_date': datetime(2016, 1, 1), | |
} | |
dag = DAG('bteq-test', default_args=default_args, schedule_interval="@daily") | |
def loadBTEQ(path): | |
fd = open(path, 'r') | |
sqlFile = fd.read() | |
fd.close() | |
return sqlFile | |
t0 = DockerOperator( | |
task_id='BTEQ_NOMBRE', | |
image='bci/teradata:bteq', | |
api_version='1.21', | |
docker_url='161.131.194.95:2375', | |
command=loadBTEQ('./dags/logon.sql'), | |
xcom_all=True, | |
destroy_on_finish=True, | |
dag=dag | |
) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import json | |
import logging | |
from airflow.exceptions import AirflowException | |
from airflow.models import BaseOperator | |
from airflow.utils.decorators import apply_defaults | |
from airflow.utils.file import TemporaryDirectory | |
from docker import Client, tls | |
import ast | |
class DockerOperator(BaseOperator): | |
""" | |
Execute a command inside a docker container. | |
A temporary directory is created on the host and mounted into a container to allow storing files | |
that together exceed the default disk size of 10GB in a container. The path to the mounted | |
directory can be accessed via the environment variable ``AIRFLOW_TMP_DIR``. | |
:param image: Docker image from which to create the container. | |
:type image: str | |
:param api_version: Remote API version. | |
:type api_version: str | |
:param command: Command to be run in the container. | |
:type command: str or list | |
:param cpus: Number of CPUs to assign to the container. | |
This value gets multiplied with 1024. See | |
https://docs.docker.com/engine/reference/run/#cpu-share-constraint | |
:type cpus: float | |
:param docker_url: URL of the host running the docker daemon. | |
:type docker_url: str | |
:param environment: Environment variables to set in the container. | |
:type environment: dict | |
:param force_pull: Pull the docker image on every run. | |
:type force_pull: bool | |
:param mem_limit: Maximum amount of memory the container can use. Either a float value, which | |
represents the limit in bytes, or a string like ``128m`` or ``1g``. | |
:type mem_limit: float or str | |
:param network_mode: Network mode for the container. | |
:type network_mode: str | |
:param tls_ca_cert: Path to a PEM-encoded certificate authority to secure the docker connection. | |
:type tls_ca_cert: str | |
:param tls_client_cert: Path to the PEM-encoded certificate used to authenticate docker client. | |
:type tls_client_cert: str | |
:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client. | |
:type tls_client_key: str | |
:param tls_hostname: Hostname to match against the docker server certificate or False to | |
disable the check. | |
:type tls_hostname: str or bool | |
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon. | |
:type tls_ssl_version: str | |
:param tmp_dir: Mount point inside the container to a temporary directory created on the host by | |
the operator. The path is also made available via the environment variable | |
``AIRFLOW_TMP_DIR`` inside the container. | |
:type tmp_dir: str | |
:param user: Default user inside the docker container. | |
:type user: int or str | |
:param volumes: List of volumes to mount into the container, e.g. | |
``['/host/path:/container/path', '/host/path2:/container/path2:ro']``. | |
:param xcom_push: Does the stdout will be pushed to the next step using XCom. | |
The default is False. | |
:type xcom_push: bool | |
:param xcom_all: Push all the stdout or just the last line. The default is False (last line). | |
:type xcom_all: bool | |
""" | |
template_fields = ('command',) | |
template_ext = ('.sh', '.bash',) | |
@apply_defaults | |
def __init__( | |
self, | |
image, | |
api_version=None, | |
command=None, | |
cpus=1.0, | |
docker_url='unix://var/run/docker.sock', | |
environment=None, | |
force_pull=False, | |
mem_limit=None, | |
network_mode=None, | |
tls_ca_cert=None, | |
tls_client_cert=None, | |
tls_client_key=None, | |
tls_hostname=None, | |
tls_ssl_version=None, | |
tmp_dir='/tmp/airflow', | |
user=None, | |
volumes=None, | |
xcom_push=False, | |
xcom_all=False, | |
destroy_on_finish=True, | |
*args, | |
**kwargs): | |
super(DockerOperator, self).__init__(*args, **kwargs) | |
self.api_version = api_version | |
self.command = command | |
self.cpus = cpus | |
self.docker_url = docker_url | |
self.environment = environment or {} | |
self.force_pull = force_pull | |
self.image = image | |
self.mem_limit = mem_limit | |
self.network_mode = network_mode | |
self.tls_ca_cert = tls_ca_cert | |
self.tls_client_cert = tls_client_cert | |
self.tls_client_key = tls_client_key | |
self.tls_hostname = tls_hostname | |
self.tls_ssl_version = tls_ssl_version | |
self.tmp_dir = tmp_dir | |
self.user = user | |
self.volumes = volumes or [] | |
self.xcom_push = xcom_push | |
self.xcom_all = xcom_all | |
self.destroy_on_finish = destroy_on_finish | |
self.cli = None | |
self.container = None | |
def execute(self, context): | |
logging.info('Starting docker container from image ' + self.image) | |
tls_config = None | |
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key: | |
tls_config = tls.TLSConfig( | |
ca_cert=self.tls_ca_cert, | |
client_cert=(self.tls_client_cert, self.tls_client_key), | |
verify=True, | |
ssl_version=self.tls_ssl_version, | |
assert_hostname=self.tls_hostname | |
) | |
self.docker_url = self.docker_url.replace('tcp://', 'https://') | |
self.cli = Client(base_url=self.docker_url, version=self.api_version, tls=tls_config) | |
if ':' not in self.image: | |
image = self.image + ':latest' | |
else: | |
image = self.image | |
if self.force_pull or len(self.cli.images(name=image)) == 0: | |
logging.info('Pulling docker image ' + image) | |
for l in self.cli.pull(image, stream=True): | |
output = json.loads(l) | |
logging.info("{}".format(output['status'])) | |
cpu_shares = int(round(self.cpus * 1024)) | |
with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir: | |
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir | |
self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir)) | |
self.container = self.cli.create_container( | |
command=self.get_command(), | |
cpu_shares=cpu_shares, | |
environment=self.environment, | |
host_config=self.cli.create_host_config(binds=self.volumes, | |
network_mode=self.network_mode), | |
image=image, | |
mem_limit=self.mem_limit, | |
user=self.user | |
) | |
self.cli.start(self.container['Id']) | |
line = '' | |
for line in self.cli.logs(container=self.container['Id'], stream=True): | |
logging.info("{}".format(line.strip())) | |
exit_code = self.cli.wait(self.container['Id']) | |
if exit_code != 0: | |
raise AirflowException('docker container failed') | |
elif self.destroy_on_finish: | |
self.cli.remove_container(self.container['Id']) | |
if self.xcom_push: | |
return self.cli.logs(container=self.container['Id']) if self.xcom_all else str(line.strip()) | |
def get_command(self): | |
if self.command is not None and self.command.strip().find('[') == 0: | |
commands = ast.literal_eval(self.command) | |
else: | |
commands = self.command | |
return commands | |
def on_kill(self): | |
if self.cli is not None: | |
logging.info('Stopping docker container') | |
self.cli.stop(self.container['Id']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment