Created
January 27, 2023 05:03
-
-
Save shin1103/40e405aa5e8925b59b16d2ab110a3a70 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import traceback | |
from typing import TYPE_CHECKING, Any, List, Optional | |
from airflow.exceptions import AirflowException | |
from airflow.models import BaseOperator | |
if TYPE_CHECKING: | |
from airflow.utils.context import Context | |
class EcrCredentialRegisterOperator(BaseOperator): | |
""" register ECR credential to Airflow connections. | |
""" | |
def __init__(self, *, conn_id_name: str, region_name: str, register_ids: List[str], **kwargs,) -> None: | |
""" Constructor. conn_id_name, region_name, registry_ids, task_id(this is BaseOperator' arg) is mandatory. | |
Args: | |
conn_id_name (str): connection id name to register Airflow connections | |
region_name (str): ECR's region name | |
registry_ids List[str]: AWS accountID(12-figure number). The argument is of type list, but the list is assumed to have a single element. | |
""" | |
super().__init__(**kwargs) | |
self.conn_id_name = conn_id_name | |
self.region_name = region_name | |
self.registry_ids = register_ids | |
def __set_ecr_credential_to_airflow(self) -> None: | |
""" register ECR credential to Airflow connections. | |
this methond refer the site below. In this site, some commandline options is obsolate, I edited some options. | |
https://www.lucidchart.com/techblog/2019/03/22/using-apache-airflows-docker-operator-with-amazons-container-repository/ | |
""" | |
import base64 | |
import subprocess | |
import boto3 | |
# Get ECR Credentials | |
ecr = boto3.client('ecr', region_name=self.region_name) | |
response = ecr.get_authorization_token(registryIds=self.registry_ids) | |
username, password = base64.b64decode( | |
response['authorizationData'][0]['authorizationToken'] | |
).decode('UTF-8').split(':') | |
registry_url = response['authorizationData'][0]['proxyEndpoint'] | |
# Delete existing docker connection | |
airflow_del_cmd = 'airflow connections delete {}'.format( | |
self.conn_id_name) | |
process = subprocess.Popen( | |
airflow_del_cmd.split(), stdout=subprocess.PIPE) | |
del_output, del_error = process.communicate() | |
# Add existing docker connection | |
airflow_add_cmd = 'airflow connections add --conn-type docker --conn-host {} --conn-login {} --conn-password {} {}'.format( | |
registry_url, username, password, self.conn_id_name) | |
process = subprocess.Popen( | |
airflow_add_cmd.split(), stdout=subprocess.PIPE) | |
add_output, add_error = process.communicate() | |
def execute(self, context: 'Context') -> Optional[List[Any]]: | |
""" Override BaseOperator | |
""" | |
try: | |
self.__set_ecr_credential_to_airflow() | |
except Exception as e: | |
print(traceback.format_exc()) | |
raise AirflowException( | |
'Fail to register ECR credential to Airflow connections.') from e |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment