Skip to content

Instantly share code, notes, and snippets.

@shin1103
Created January 27, 2023 05:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shin1103/40e405aa5e8925b59b16d2ab110a3a70 to your computer and use it in GitHub Desktop.
Save shin1103/40e405aa5e8925b59b16d2ab110a3a70 to your computer and use it in GitHub Desktop.
import traceback
from typing import TYPE_CHECKING, Any, List, Optional
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
class EcrCredentialRegisterOperator(BaseOperator):
""" register ECR credential to Airflow connections.
"""
def __init__(self, *, conn_id_name: str, region_name: str, register_ids: List[str], **kwargs,) -> None:
""" Constructor. conn_id_name, region_name, registry_ids, task_id(this is BaseOperator' arg) is mandatory.
Args:
conn_id_name (str): connection id name to register Airflow connections
region_name (str): ECR's region name
registry_ids List[str]: AWS accountID(12-figure number). The argument is of type list, but the list is assumed to have a single element.
"""
super().__init__(**kwargs)
self.conn_id_name = conn_id_name
self.region_name = region_name
self.registry_ids = register_ids
def __set_ecr_credential_to_airflow(self) -> None:
""" register ECR credential to Airflow connections.
this methond refer the site below. In this site, some commandline options is obsolate, I edited some options.
https://www.lucidchart.com/techblog/2019/03/22/using-apache-airflows-docker-operator-with-amazons-container-repository/
"""
import base64
import subprocess
import boto3
# Get ECR Credentials
ecr = boto3.client('ecr', region_name=self.region_name)
response = ecr.get_authorization_token(registryIds=self.registry_ids)
username, password = base64.b64decode(
response['authorizationData'][0]['authorizationToken']
).decode('UTF-8').split(':')
registry_url = response['authorizationData'][0]['proxyEndpoint']
# Delete existing docker connection
airflow_del_cmd = 'airflow connections delete {}'.format(
self.conn_id_name)
process = subprocess.Popen(
airflow_del_cmd.split(), stdout=subprocess.PIPE)
del_output, del_error = process.communicate()
# Add existing docker connection
airflow_add_cmd = 'airflow connections add --conn-type docker --conn-host {} --conn-login {} --conn-password {} {}'.format(
registry_url, username, password, self.conn_id_name)
process = subprocess.Popen(
airflow_add_cmd.split(), stdout=subprocess.PIPE)
add_output, add_error = process.communicate()
def execute(self, context: 'Context') -> Optional[List[Any]]:
""" Override BaseOperator
"""
try:
self.__set_ecr_credential_to_airflow()
except Exception as e:
print(traceback.format_exc())
raise AirflowException(
'Fail to register ECR credential to Airflow connections.') from e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment