Skip to content

Instantly share code, notes, and snippets.

@gmic
Created January 26, 2017 12:06
Show Gist options
  • Save gmic/3c2a0530f60243c1c766bc915eed1788 to your computer and use it in GitHub Desktop.
Save gmic/3c2a0530f60243c1c766bc915eed1788 to your computer and use it in GitHub Desktop.
Airflow ssh sensor
class SshSensorOperator(SSHExecuteOperator, BaseSensorOperator):
"""
Wait for some ssh command to succeed.
"""
count = 0
def poke(self, context):
"""
Function that checks for ssh command.
"""
try:
SSHExecuteOperator.execute(self, context)
return 1
except AirflowException:
return 0
def execute(self, context):
started_at = datetime.now()
while not self.poke(context):
if (datetime.now() - started_at).total_seconds() > self.timeout:
if self.soft_fail:
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
logging.info("Success criteria met. Exiting.")
"""
Usage:
task_start = SshSensorOperator(
task_id='start_source',
ssh_hook=landing_ssh_hook,
bash_command='test -n "$(find ${DATA_LANDING_BASE}' +
'/ -maxdepth 1 -name \'*.sh\' -print -quit)"',
dag=dag)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment