Last active
June 4, 2024 06:59
-
-
Save tinogis/105a053ab2295c243e7e5b43e8c3c68b to your computer and use it in GitHub Desktop.
FTP_MON: Monitorizes STG ftp server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from ftplib import FTP | |
import click | |
from datetime import datetime | |
try: | |
from urllib.parse import urlparse | |
except: | |
from urlparse import urlparse | |
import os | |
log_file = '/tmp/dup_cleaner.log' | |
def log(text): | |
print(text) | |
tm_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
with open(log_file, 'a') as lfile: | |
lfile.write('{} : [{}] {}\n'.format(tm_str, os.getpid(), text)) | |
def connect(url): | |
ftp_dict = urlparse(url) | |
ftp = FTP() | |
ftp_port = 21 if ftp_dict.port is None else ftp_dict.port | |
ftp.connect(ftp_dict.hostname, ftp_port) | |
ftp.login(ftp_dict.username, ftp_dict.password) | |
log('Connected to {}'.format(ftp_dict.hostname)) | |
ftp.cwd(ftp_dict.path) | |
log('* current dir: {}'.format(ftp_dict.path)) | |
return ftp | |
@click.command() | |
@click.option('-u', '--url', required=True, | |
help='URL to connect to FTP ( ftp://user:pass@server:port/path/to/dirs )', | |
type=str, default='ftp://localhost', show_default=True) | |
def delete_dups(url): | |
ftp = connect(url) | |
files = ftp.nlst() | |
log('* Found {} files'.format(len(files))) | |
dup_files = [f for f in files if f.endswith('_dup')] | |
log('* Cleaning {} _dup files'.format(len(dup_files))) | |
for dup_file in dup_files: | |
log(' - Deleting {} file ...'.format(dup_file)) | |
ftp.delete(dup_file) | |
if __name__ == '__main__': | |
delete_dups() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from ftplib import FTP | |
import re | |
from datetime import datetime | |
# TODO data from bd | |
ftp_server = 'server' | |
ftp_port = 21 | |
ftp_user = 'user' | |
ftp_pwd = 'top_secret' | |
ftp_path = 'upload' | |
groups = ['report' ,'date'] | |
REGEXPS = { | |
'report': '_(S[0-9][0-9])_', | |
'date': '_([0-9]{8})[0-9]+$', | |
'cnc': '^[_/]*([^/_]+)_' | |
} | |
def get_ftp_info(ftp_server, ftp_port, ftp_user, ftp_pwd, groups=['report', 'date']): | |
ftp = FTP(ftp_server) | |
ftp.connect(ftp_server, ftp_port) | |
ftp.login(ftp_user, ftp_pwd) | |
ftp_res = ftp.nlst(ftp_path) | |
ftp.close() | |
now = datetime.now() | |
res = {} | |
res['timestamp'] = now | |
res['remaining'] = len(ftp_res) | |
res['groups'] = {} | |
for group in groups: | |
report_stats = {'notValid': 0} | |
for filename in ftp_res: | |
regexp = REGEXPS[group] | |
filetype_match = re.search(regexp, filename) | |
if filetype_match: | |
filetype = filetype_match.group(1) | |
num = report_stats.get(filetype, 0) | |
report_stats[filetype] = num + 1 | |
else: | |
report_stats['notValid'] += 1 | |
for report in sorted(report_stats.keys()): | |
if group not in res['groups']: | |
res['groups'][group] = {} | |
res['groups'][group][report] = report_stats[report] | |
return res | |
res = get_ftp_info(ftp_server, ftp_port, ftp_user, ftp_pwd, groups) | |
print("{}".format(res['timestamp'])) | |
print("{} files remaining\n".format(res['remaining'])) | |
for group, values in res['groups'].items(): | |
print('# {}'.format(group)) | |
for key, value in values.items(): | |
print(' * {}: {}'.format(key, value)) | |
print("") | |
print("{}".format(res['timestamp'])) | |
print("{} files remaining\n".format(res['remaining'])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from erppeek import Client | |
import click | |
import os | |
try: | |
from urllib.parse import urlparse | |
except: | |
from urlparse import urlparse | |
import sys | |
import json | |
import time | |
import logging | |
from socket import error as socket_error | |
logger = logging.getLogger('FTP MON') | |
logging.basicConfig(level=logging.INFO) | |
try: | |
# Due: https://www.python.org/dev/peps/pep-0476 | |
import ssl | |
try: | |
_create_unverified_https_context = ssl._create_unverified_context | |
except AttributeError: | |
# Legacy Python that doesn't verify HTTPS certificates by default | |
pass | |
else: | |
# Handle target environment that doesn't support HTTPS verification | |
ssl._create_default_https_context = _create_unverified_https_context | |
except ImportError: | |
pass | |
def get_connection(url): | |
url = urlparse(url) | |
conn = [ | |
'{}://{}'.format(url.scheme, url.netloc), | |
url.path.replace('/', ''), | |
url.username | |
] | |
if url.password: | |
conn.append(url.password) | |
c = Client(*conn) | |
return c | |
def ftp_mon(c, cnc, debug, cls=False): | |
try: | |
info = c.TgFtp.get_aggregated_ftp_info(cnc) | |
if cls: | |
print(chr(27) + "[2J") | |
if debug: | |
print('```json\n') | |
print(json.dumps(info, indent=4, sort_keys=True)) | |
print('```\n') | |
except Exception as e: | |
logger.error("ERROR getting ftp_info: {}\n".format(e)) | |
return None | |
logger.info('# FTP Servers ({}):'.format(c._db)) | |
for server in info['summary']['servers']: | |
logger.info(' * {}'.format(server)) | |
logger.info('') | |
logger.info('\nINFO:\n' + info['info']) | |
@click.command() | |
@click.option('-u', '--url', | |
help='URL to connect to via ERP Peek ( http://user:pass@server:port/DATABASE )', | |
type=str, default='http://localhost', show_default=True) | |
@click.option('-c', '--cnc', is_flag=True, | |
help='Gets CNC stat', | |
type=bool, default=False) | |
@click.option('-d', '--debug', is_flag=True, | |
help='Shows debug info (ie, struct data in json)', | |
type=bool, default=False) | |
@click.option('-i', '--interval', help='Time interval', type=int, default=0) | |
def connect_host(url, cnc, debug, interval): | |
server_url = urlparse(url).netloc | |
server_url = server_url[server_url.index('@')+1: server_url.rindex(':')] | |
#sys.stdout.write("Connecting to server {} ...\n".format(server_url)) | |
c = get_connection(url) | |
if interval: | |
while True: | |
try: | |
ftp_mon(c, cnc, debug, interval) | |
except socket_error: | |
print("Connection error, trying to reconnect") | |
c = get_connection(url) | |
time.sleep(interval) | |
else: | |
ftp_mon(c, cnc, debug) | |
if __name__ == '__main__': | |
connect_host() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment