Last active
May 29, 2020 19:16
-
-
Save saumalya75/4a7e38cc2232f315821bdf001bcfd16c to your computer and use it in GitHub Desktop.
Custom operator code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.models.baseoperator import BaseOperator | |
from hooks.custom_s3_minio_hook import CustomS3MinioHook | |
from airflow.utils.decorators import apply_defaults | |
# Demo application integration imports | |
import myscript as ms | |
from myscript.run import check | |
from myscript import run as r | |
class CustomFileProcessingOperator(BaseOperator): | |
""" | |
Currently this operator is not doing anything practically. | |
We are just reading the ource file and printing the content. | |
But anyone can add logic to based on hteir reqirement. The configurations are structure will be similar. | |
""" | |
@apply_defaults | |
def __init__(self | |
, xcom_task_id_key:str = 'data_processor_key' | |
, xcom_source_task_id:str = '' | |
, xcom_key:str = '' | |
, source_bed_type:str = 'aws' | |
, source_endpoint_url:str = '' | |
, source_conn_id:str = 'aws_default' | |
, source_verify:str = '' | |
, *args | |
, **kwargs | |
): | |
super().__init__(*args, **kwargs) | |
if (not len(xcom_key)) or (not len(xcom_source_task_id)): | |
raise AirflowException("Providing XCOM source task name and key is mandatory for for this processing operator.") | |
self.xcom_task_id_key = xcom_task_id_key | |
self.xcom_task_id = xcom_source_task_id | |
self.xcom_key = xcom_key | |
self.source_bed_type = source_bed_type | |
self.source_endpoint_url=source_endpoint_url | |
self.source_conn_id=source_conn_id | |
self.source_verify=source_verify | |
def execute(self, context): | |
"""File processing is implemented""" | |
# Demo application integration prints | |
print("*" * 50) | |
print(check) | |
print(r.check) | |
print(ms.check) | |
print('~' * 50) | |
# Hooking to the source bed | |
source_hook = CustomS3MinioHook( | |
conn_type=self.source_bed_type | |
, endpoint_url=self.source_endpoint_url | |
, aws_conn_id=self.source_conn_id | |
, verify=self.source_verify | |
) | |
print(f"Source Connection Type: {self.source_bed_type}") | |
#Just read the data and consider the demo to be completed | |
source_data = source_hook.read_key(self.source_key, self.source_bucket_name) | |
print(source_data) | |
print("Execution complete!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment