Last active
July 20, 2021 18:39
-
-
Save flolas/3e474c5da79401e54daf57e47f5e63fd to your computer and use it in GitHub Desktop.
SFTP Sensor Airflow 1.9.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from airflow.contrib.hooks.ssh_hook import SSHHook | |
from airflow.operators.sensors import BaseSensorOperator | |
from airflow.utils.decorators import apply_defaults | |
class SFTPSensor(BaseSensorOperator): | |
""" | |
Waits for a file or directory to be present on SFTP usign SSH Hook | |
:param path: Remote file or directory path | |
:type path: str | |
:param ssh_conn_id: The connection to run the sensor against | |
:type ssh_conn_id: str | |
""" | |
template_fields = ('path',) | |
ui_color = '#89a203' | |
@apply_defaults | |
def __init__(self, path, ssh_conn_id='ssh_default', success_when_file_exists=True, *args, **kwargs): | |
super(SFTPSensor, self).__init__(*args, **kwargs) | |
self.path = path | |
self.success_when_file_exists = success_when_file_exists | |
self.ssh_conn_id = ssh_conn_id | |
self.file_exists = None | |
def _create_hook(self): | |
"""Return connection hook.""" | |
return SSHHook(ssh_conn_id=self.ssh_conn_id) | |
def poke(self, context): | |
with self._create_hook().get_conn() as ssh_client: | |
sftp_client = ssh_client.open_sftp() | |
self.log.info('Poking for %s', self.path) | |
self.log.info('success_when_file_exists : %s', self.success_when_file_exists) | |
try: | |
sftp_client.stat(self.path) | |
self.log.info('File exists.') | |
self.file_exists = True | |
except IOError: | |
self.file_exists = False | |
self.log.info('File does not exists.') | |
except Exception as e: | |
self.log.error(e) | |
if self.file_exists == None: | |
raise Exception('Error checking file.') | |
return self.file_exists == self.success_when_file_exists |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, how do I implement this code to my dag? I'm unsure where to start