Last active
October 28, 2022 13:58
-
-
Save Harduim/e586ff0affe8e37cf3138d3a0921f9f9 to your computer and use it in GitHub Desktop.
Templated Docker Taskflow Operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pendulum import datetime | |
from airflow.decorators import dag, task | |
from platform import node | |
from typing import Optional, Callable, Sequence | |
from airflow.providers.docker.decorators.docker import _DockerDecoratedOperator | |
from airflow.decorators.base import task_decorator_factory | |
class DockerDecoratedOperator(_DockerDecoratedOperator): | |
template_fields: Sequence[str] = ("op_args", "op_kwargs", "environment") | |
docker_defaults = { | |
"api_version": "auto", | |
"auto_remove": True, | |
"network_mode": "bridge", | |
"docker_url": "tcp://docker:2376", | |
"mount_tmp_dir": False, | |
"tls_ca_cert": "/home/airflow/.docker/ca.pem", | |
"tls_client_cert": "/home/airflow/.docker/cert.pem", | |
"tls_client_key": "/home/airflow/.docker/key.pem", | |
"docker_conn_id": "dockergitlab", | |
"working_dir": "/home/airflow", | |
} | |
def docker_task_decorator( | |
python_callable: Optional[Callable] = None, | |
multiple_outputs: Optional[bool] = None, | |
**kwargs, | |
) -> TaskDecorator: | |
""" | |
Python operator decorator. Wraps a function into an Airflow operator. | |
Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. | |
:param python_callable: Function to decorate | |
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values. | |
Dict will unroll to XCom values with keys as XCom keys. Defaults to False. | |
""" | |
return task_decorator_factory( | |
python_callable=python_callable, | |
multiple_outputs=multiple_outputs, | |
decorated_operator_class=DockerDecoratedOperator, | |
**{**docker_defaults, **kwargs}, | |
) | |
task.docker = docker_task |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment