Skip to content

Instantly share code, notes, and snippets.

@geoHeil
Created July 25, 2022 20:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save geoHeil/a7cc0b70e31f3e946a9ffed14d84a3ed to your computer and use it in GitHub Desktop.
Save geoHeil/a7cc0b70e31f3e946a9ffed14d84a3ed to your computer and use it in GitHub Desktop.
sensor not resetting example
def make_date_file_sensor_for_asset(j: JobDefinition, resource_defs_for_ssh):
@sensor(
job=j,
name=j.name + "_sensor",
default_status=DefaultSensorStatus.RUNNING,
)
def date_file_sensor(context):
with build_resources(resource_defs_for_ssh) as resources:
ssh = resources.ssh
sftp = ssh.open_sftp()
last_processed_date = context.cursor
if last_processed_date is None:
next_date = pd.to_datetime(START_DATE).strftime(DATE_FORMAT)
next_date_formatted = pd.to_datetime(START_DATE).strftime(
DATE_FORMAT_PARTITION
)
else:
next_date = (
datetime.strptime(last_processed_date, DATE_FORMAT)
+ timedelta(days=7)
).strftime(DATE_FORMAT)
next_date_formatted = pd.to_datetime(next_date).strftime(
DATE_FORMAT_PARTITION
)
path = f"upload/myfile{next_date_formatted}.zip"
if sftp_exists(sftp, path):
context.update_cursor(next_date)
close(sftp, ssh)
return j.run_request_for_partition(next_date, run_key=path)
else:
close(sftp, ssh)
return SkipReason(f"Did not find file {path}")
return date_file_sensor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment