Skip to content

Instantly share code, notes, and snippets.

@tinogis
Last active June 4, 2024 06:59
Show Gist options
  • Save tinogis/105a053ab2295c243e7e5b43e8c3c68b to your computer and use it in GitHub Desktop.
Save tinogis/105a053ab2295c243e7e5b43e8c3c68b to your computer and use it in GitHub Desktop.
FTP_MON: Monitorizes STG ftp server
# -*- coding: utf-8 -*-
from ftplib import FTP
import click
from datetime import datetime
try:
from urllib.parse import urlparse
except:
from urlparse import urlparse
import os
log_file = '/tmp/dup_cleaner.log'
def log(text):
print(text)
tm_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(log_file, 'a') as lfile:
lfile.write('{} : [{}] {}\n'.format(tm_str, os.getpid(), text))
def connect(url):
ftp_dict = urlparse(url)
ftp = FTP()
ftp_port = 21 if ftp_dict.port is None else ftp_dict.port
ftp.connect(ftp_dict.hostname, ftp_port)
ftp.login(ftp_dict.username, ftp_dict.password)
log('Connected to {}'.format(ftp_dict.hostname))
ftp.cwd(ftp_dict.path)
log('* current dir: {}'.format(ftp_dict.path))
return ftp
@click.command()
@click.option('-u', '--url', required=True,
help='URL to connect to FTP ( ftp://user:pass@server:port/path/to/dirs )',
type=str, default='ftp://localhost', show_default=True)
def delete_dups(url):
ftp = connect(url)
files = ftp.nlst()
log('* Found {} files'.format(len(files)))
dup_files = [f for f in files if f.endswith('_dup')]
log('* Cleaning {} _dup files'.format(len(dup_files)))
for dup_file in dup_files:
log(' - Deleting {} file ...'.format(dup_file))
ftp.delete(dup_file)
if __name__ == '__main__':
delete_dups()
# -*- coding: utf-8 -*-
from ftplib import FTP
import re
from datetime import datetime
# TODO data from bd
ftp_server = 'server'
ftp_port = 21
ftp_user = 'user'
ftp_pwd = 'top_secret'
ftp_path = 'upload'
groups = ['report' ,'date']
REGEXPS = {
'report': '_(S[0-9][0-9])_',
'date': '_([0-9]{8})[0-9]+$',
'cnc': '^[_/]*([^/_]+)_'
}
def get_ftp_info(ftp_server, ftp_port, ftp_user, ftp_pwd, groups=['report', 'date']):
ftp = FTP(ftp_server)
ftp.connect(ftp_server, ftp_port)
ftp.login(ftp_user, ftp_pwd)
ftp_res = ftp.nlst(ftp_path)
ftp.close()
now = datetime.now()
res = {}
res['timestamp'] = now
res['remaining'] = len(ftp_res)
res['groups'] = {}
for group in groups:
report_stats = {'notValid': 0}
for filename in ftp_res:
regexp = REGEXPS[group]
filetype_match = re.search(regexp, filename)
if filetype_match:
filetype = filetype_match.group(1)
num = report_stats.get(filetype, 0)
report_stats[filetype] = num + 1
else:
report_stats['notValid'] += 1
for report in sorted(report_stats.keys()):
if group not in res['groups']:
res['groups'][group] = {}
res['groups'][group][report] = report_stats[report]
return res
res = get_ftp_info(ftp_server, ftp_port, ftp_user, ftp_pwd, groups)
print("{}".format(res['timestamp']))
print("{} files remaining\n".format(res['remaining']))
for group, values in res['groups'].items():
print('# {}'.format(group))
for key, value in values.items():
print(' * {}: {}'.format(key, value))
print("")
print("{}".format(res['timestamp']))
print("{} files remaining\n".format(res['remaining']))
# -*- coding: utf-8 -*-
from erppeek import Client
import click
import os
try:
from urllib.parse import urlparse
except:
from urlparse import urlparse
import sys
import json
import time
import logging
from socket import error as socket_error
logger = logging.getLogger('FTP MON')
logging.basicConfig(level=logging.INFO)
try:
# Due: https://www.python.org/dev/peps/pep-0476
import ssl
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy Python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
except ImportError:
pass
def get_connection(url):
url = urlparse(url)
conn = [
'{}://{}'.format(url.scheme, url.netloc),
url.path.replace('/', ''),
url.username
]
if url.password:
conn.append(url.password)
c = Client(*conn)
return c
def ftp_mon(c, cnc, debug, cls=False):
try:
info = c.TgFtp.get_aggregated_ftp_info(cnc)
if cls:
print(chr(27) + "[2J")
if debug:
print('```json\n')
print(json.dumps(info, indent=4, sort_keys=True))
print('```\n')
except Exception as e:
logger.error("ERROR getting ftp_info: {}\n".format(e))
return None
logger.info('# FTP Servers ({}):'.format(c._db))
for server in info['summary']['servers']:
logger.info(' * {}'.format(server))
logger.info('')
logger.info('\nINFO:\n' + info['info'])
@click.command()
@click.option('-u', '--url',
help='URL to connect to via ERP Peek ( http://user:pass@server:port/DATABASE )',
type=str, default='http://localhost', show_default=True)
@click.option('-c', '--cnc', is_flag=True,
help='Gets CNC stat',
type=bool, default=False)
@click.option('-d', '--debug', is_flag=True,
help='Shows debug info (ie, struct data in json)',
type=bool, default=False)
@click.option('-i', '--interval', help='Time interval', type=int, default=0)
def connect_host(url, cnc, debug, interval):
server_url = urlparse(url).netloc
server_url = server_url[server_url.index('@')+1: server_url.rindex(':')]
#sys.stdout.write("Connecting to server {} ...\n".format(server_url))
c = get_connection(url)
if interval:
while True:
try:
ftp_mon(c, cnc, debug, interval)
except socket_error:
print("Connection error, trying to reconnect")
c = get_connection(url)
time.sleep(interval)
else:
ftp_mon(c, cnc, debug)
if __name__ == '__main__':
connect_host()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment