Skip to content

Instantly share code, notes, and snippets.

@flolas
Last active July 20, 2021 18:39
Show Gist options
  • Save flolas/3e474c5da79401e54daf57e47f5e63fd to your computer and use it in GitHub Desktop.
Save flolas/3e474c5da79401e54daf57e47f5e63fd to your computer and use it in GitHub Desktop.
SFTP Sensor Airflow 1.9.0
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class SFTPSensor(BaseSensorOperator):
"""
Waits for a file or directory to be present on SFTP usign SSH Hook
:param path: Remote file or directory path
:type path: str
:param ssh_conn_id: The connection to run the sensor against
:type ssh_conn_id: str
"""
template_fields = ('path',)
ui_color = '#89a203'
@apply_defaults
def __init__(self, path, ssh_conn_id='ssh_default', success_when_file_exists=True, *args, **kwargs):
super(SFTPSensor, self).__init__(*args, **kwargs)
self.path = path
self.success_when_file_exists = success_when_file_exists
self.ssh_conn_id = ssh_conn_id
self.file_exists = None
def _create_hook(self):
"""Return connection hook."""
return SSHHook(ssh_conn_id=self.ssh_conn_id)
def poke(self, context):
with self._create_hook().get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
self.log.info('Poking for %s', self.path)
self.log.info('success_when_file_exists : %s', self.success_when_file_exists)
try:
sftp_client.stat(self.path)
self.log.info('File exists.')
self.file_exists = True
except IOError:
self.file_exists = False
self.log.info('File does not exists.')
except Exception as e:
self.log.error(e)
if self.file_exists == None:
raise Exception('Error checking file.')
return self.file_exists == self.success_when_file_exists
@chenuratikah
Copy link

Hi, how do I implement this code to my dag? I'm unsure where to start

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment