Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save saumalya75/f4068e3a91169d18c5371388ae452587 to your computer and use it in GitHub Desktop.
Save saumalya75/f4068e3a91169d18c5371388ae452587 to your computer and use it in GitHub Desktop.
Custom S3 Sensor
class S3SensorFromProvidedValue(BaseSensorOperator):
@apply_defaults
def __init__(self,
conn_type:str = '',
endpoint_url:str = 'http://127.0.0.1:9000',
bucket_name:str = 'default_bucket',
identifier:str = 'default_ind',
wildcard_match:bool = False,
aws_conn_id='',
verify=None,
remove_on_detection=True,
xcom_task_id_key:str = '1234',
refresh_xcom:bool = True,
*args,
**kwargs
):
self.conn_type = conn_type
self.endpoint_url = endpoint_url
self.bucket_name = bucket_name
self.identifier = identifier
self.wildcard_match = wildcard_match
self.aws_conn_id = aws_conn_id
self.verify = verify
self.remove_on_detection = remove_on_detection
self.xcom_task_id_key = xcom_task_id_key
self.refresh_xcom = refresh_xcom
super().__init__(*args, **kwargs)
def poke(self, context):
try:
"""
This sensor reads the trigger file.
It also puts the values provided in trigger file on xcomm.
"""
task_instance = context['task_instance']
hook = CustomS3MinioHook(conn_type=self.conn_type, endpoint_url=self.endpoint_url, aws_conn_id=self.aws_conn_id, verify=self.verify)
print(f"Connection Type: {self.conn_type}")
print("Check for :" + self.bucket_name + "/" + self.identifier)
if self.wildcard_match:
presence = hook.check_for_wildcard_key(self.identifier, self.bucket_name)
if presence:
trigger_file = hook.get_wildcard_key(self.identifier, self.bucket_name)
if type(trigger_file) == list:
trigger_file = trigger_file[0].key
else:
trigger_file = trigger_file.key
return presence
else:
presence = hook.check_for_key(self.identifier, self.bucket_name)
if presence:
trigger_file = hook.get_wildcard_key(self.identifier, self.bucket_name).key
return presence
except Exception as e:
print("Something went wrong while poking: " + str(e))
print('~' * 100)
print(traceback.print_exc(file=sys.stdout))
print('~' * 100)
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment