Skip to content

Instantly share code, notes, and snippets.

@bobpeers
Created June 14, 2022 10:44
Show Gist options
  • Save bobpeers/7a0bb2592f4c00a0c81daeaa28246e06 to your computer and use it in GitHub Desktop.
Save bobpeers/7a0bb2592f4c00a0c81daeaa28246e06 to your computer and use it in GitHub Desktop.
Script to monitor files left in FTP queue
import os
import requests
import json
from itertools import islice
from pathlib import Path
import time
import prefect
from prefect import task, Flow
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect.storage import Local
import pendulum
PY_SCRIPT = 'F:/Analytics/Workflows_prefect/IT/FTP Monitor.py'
FLOW_NAME = 'FTP Monitor'
PROJECT = 'IT'
prime_cargo_dir: str = 'D:\\PrimeCargo\\Live\\Ud\\'
now: float = time.time()
age: int = 0
msg: str = ''
def post_teams(msg: str):
base_url = 'INBOUND_WEBHOOK'
payload = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": "FTP Notification",
"themeColor": "001A4D",
"title": "⚠ FTP Server Notification",
"sections": [
{
"text": msg
}
]
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
res = requests.request('POST', base_url, headers=headers, data=json.dumps(payload))
return res.text.encode('UTF-8')
@task(max_retries=3, retry_delay=pendulum.duration(seconds=30))
def check_files(folder: str):
paths = [f.path for f in islice(os.scandir(prime_cargo_dir),2000)]
for f in paths:
age = int((now - os.stat(f).st_ctime) / 60)
if age > 30:
post_teams(f"FTP Server has stopped, file {f} is {age} minutes old")
break
else:
pass
schedule = Schedule(clocks=[CronClock("0 */1 * * *",start_date=pendulum.now())])
with Flow(FLOW_NAME,schedule, storage=Local(path=PY_SCRIPT, stored_as_script=True)) as flow:
check_files(prime_cargo_dir)
flow.register(project_name=PROJECT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment