Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save saumalya75/4a7e38cc2232f315821bdf001bcfd16c to your computer and use it in GitHub Desktop.
Save saumalya75/4a7e38cc2232f315821bdf001bcfd16c to your computer and use it in GitHub Desktop.
Custom operator code
from airflow.models.baseoperator import BaseOperator
from hooks.custom_s3_minio_hook import CustomS3MinioHook
from airflow.utils.decorators import apply_defaults
# Demo application integration imports
import myscript as ms
from myscript.run import check
from myscript import run as r
class CustomFileProcessingOperator(BaseOperator):
"""
Currently this operator is not doing anything practically.
We are just reading the ource file and printing the content.
But anyone can add logic to based on hteir reqirement. The configurations are structure will be similar.
"""
@apply_defaults
def __init__(self
, xcom_task_id_key:str = 'data_processor_key'
, xcom_source_task_id:str = ''
, xcom_key:str = ''
, source_bed_type:str = 'aws'
, source_endpoint_url:str = ''
, source_conn_id:str = 'aws_default'
, source_verify:str = ''
, *args
, **kwargs
):
super().__init__(*args, **kwargs)
if (not len(xcom_key)) or (not len(xcom_source_task_id)):
raise AirflowException("Providing XCOM source task name and key is mandatory for for this processing operator.")
self.xcom_task_id_key = xcom_task_id_key
self.xcom_task_id = xcom_source_task_id
self.xcom_key = xcom_key
self.source_bed_type = source_bed_type
self.source_endpoint_url=source_endpoint_url
self.source_conn_id=source_conn_id
self.source_verify=source_verify
def execute(self, context):
"""File processing is implemented"""
# Demo application integration prints
print("*" * 50)
print(check)
print(r.check)
print(ms.check)
print('~' * 50)
# Hooking to the source bed
source_hook = CustomS3MinioHook(
conn_type=self.source_bed_type
, endpoint_url=self.source_endpoint_url
, aws_conn_id=self.source_conn_id
, verify=self.source_verify
)
print(f"Source Connection Type: {self.source_bed_type}")
#Just read the data and consider the demo to be completed
source_data = source_hook.read_key(self.source_key, self.source_bucket_name)
print(source_data)
print("Execution complete!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment