Last active
October 5, 2023 07:44
-
-
Save itsecworks/864cc17acd141c8313fcf38e343840f3 to your computer and use it in GitHub Desktop.
Palo Alto Monitoring scripts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import requests | |
import xml.etree.ElementTree as ET | |
import logging | |
import sys | |
import json | |
import datetime | |
import re | |
import socket | |
def get_api(device, params, log=False): | |
# script_path = os.path.dirname(__file__) | |
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
# CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 20 | |
url: str = 'https://' + device + '/api/' | |
if log: | |
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd']) | |
try: | |
# response = requests.post(url, timeout=request_timeout, verify=CA_path) | |
response = requests.post(url, params=params, timeout=request_timeout, verify=False) | |
except requests.exceptions.RequestException as e: | |
if log: | |
logging.error('We run in that problem: %s', e) | |
raise SystemExit(e) | |
if response.status_code != 200: | |
if log: | |
logging.error('with the url: %s', response.url) | |
logging.error('Cannot access the website, status code: %s', response.status_code) | |
logging.error('reply from the website:\n%s\n', response.text) | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
else: | |
if log: | |
logging.info('response from website:\n\n%s\n', response.text) | |
return response.text | |
def output_parser(device, output, xpath, timestamp): | |
data = [] | |
dict = {} | |
root = ET.fromstring(output) | |
for i in xpath: | |
for entry in root.findall(i): | |
for child in entry: | |
if len(child): | |
for subchild in child: | |
dict[subchild.tag] = int(subchild.text) | |
elif child.tag in ['name', 'interface']: | |
dict[child.tag] = child.text | |
elif child.text is not None: | |
dict[child.tag] = int(child.text) | |
dict['timestamp'] = timestamp | |
dict['device'] = device | |
data.append(dict) | |
dict = {} | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'counter_interface' | |
data_collector = { | |
'counter-interface': { | |
'cmd': '<show><counter><interface>all</interface></counter></show>', | |
'xpath': ['./result/ifnet/ifnet/', './result/hw/'] | |
} | |
} | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
if args.log: | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
for key in data_collector: | |
cmd = data_collector[key]['cmd'] | |
xpath = data_collector[key]['xpath'] | |
params = { | |
'type': 'op', | |
'key': args.pwd, | |
'cmd': cmd | |
} | |
xml_response: str = get_api(device, params, args.log) | |
if key == 'counter-interface': | |
data = output_parser(device, xml_response, xpath, datenow) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import requests | |
import xml.etree.ElementTree as ET | |
import logging | |
import sys | |
import json | |
import datetime | |
import re | |
import socket | |
def get_api(device, params, log=False): | |
# script_path = os.path.dirname(__file__) | |
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
# CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 20 | |
url: str = 'https://' + device + '/api/' | |
if log: | |
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd']) | |
try: | |
# response = requests.post(url, timeout=request_timeout, verify=CA_path) | |
response = requests.post(url, params=params, timeout=request_timeout, verify=False) | |
except requests.exceptions.RequestException as e: | |
if log: | |
logging.error('We run in that problem: %s', e) | |
raise SystemExit(e) | |
if response.status_code != 200: | |
if log: | |
logging.error('with the url: %s', response.url) | |
logging.error('Cannot access the website, status code: %s', response.status_code) | |
logging.error('reply from the website:\n%s\n', response.text) | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
else: | |
if log: | |
logging.info('response from website:\n\n%s\n', response.text) | |
return response.text | |
def output_parser(device, output, xpath, timestamp): | |
gclist = [] | |
gclines = '' | |
# fields: | |
# timestamp, device, category, name, value, rate, aspect, desc, id, severity | |
root = ET.fromstring(output) | |
for entry in root.findall(xpath): | |
gclist.append(str(timestamp)) | |
gclist.append(device) | |
for child in entry: | |
if ' ' in child.text: | |
gclist.append("\"" + str(child.text) + "\"") | |
else: | |
gclist.append(str(child.text)) | |
gclines += ','.join(gclist) + '\n' | |
gclist.clear() | |
return(gclines) | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'globalcounters' | |
data_collector = { | |
'globalcounters': { | |
'cmd': '<show><counter><global><filter><delta>yes</delta></filter></global></counter></show>', | |
'xpath': './result/global/counters/entry' | |
} | |
} | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
if args.log: | |
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
for key in data_collector: | |
cmd = data_collector[key]['cmd'] | |
xpath = data_collector[key]['xpath'] | |
params = { | |
'type': 'op', | |
'key': args.pwd, | |
'cmd': cmd | |
} | |
xml_response: str = get_api(device, params, args.log) | |
if key == 'globalcounters': | |
data = output_parser(device, xml_response, xpath, datenow) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.csv', 'a') as fp: | |
fp.writelines(data) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from netmiko import ConnectHandler | |
import sys | |
import re | |
import json | |
import datetime | |
import logging | |
import socket | |
def get_cmd(device, username, pwd, command, key, timestamp, log=False): | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_netmiko.log' | |
#netmiko input dictionary | |
paloalto = { | |
'device_type': 'paloalto_panos', | |
'ip': device, | |
'username': username, | |
'password': pwd, | |
'session_log': filename, | |
} | |
ssh = ConnectHandler(**paloalto) | |
pageroffcmd = 'set cli pager off\n' | |
ssh.send_command(pageroffcmd, expect_string=r">") | |
output = ssh.send_command(command, expect_string=r">") | |
return output | |
def output_parser(output, device, timestamp): | |
data = [] | |
dict = {} | |
maindict = {} | |
for line in output.splitlines(): | |
if 'LFP' in line: | |
lfpid = line.strip().replace(':','') | |
elif 'rate' in line: | |
rate = int(line.split(':')[1].strip().split(' ')[0]) | |
elif 'Logs' in line: | |
logstage = line.strip().replace(' ','_') | |
logstage = logstage.lower() | |
elif ':' in line: | |
key = line.split(':')[0].strip().replace(' ','_') | |
key = key.lower() | |
val = int(line.split(':')[1].strip()) | |
if 'lfpid' not in locals(): | |
lfpid = 'NA' | |
if lfpid not in dict: | |
dict[lfpid] = {} | |
if logstage not in dict[lfpid]: | |
dict[lfpid][logstage] = {} | |
dict[lfpid][logstage][key] = val | |
for key in dict: | |
for subkey in dict[key]: | |
maindict['lfpid'] = key | |
maindict['logstage'] = subkey | |
for ssubkey in dict[key][subkey]: | |
maindict[ssubkey] = dict[key][subkey][ssubkey] | |
maindict['timestamp'] = timestamp | |
maindict['device'] = device | |
data.append(maindict) | |
maindict = {} | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-u', '--username', | |
dest = 'username', | |
help = 'username for the logon on Palo Alto Firewall.', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'rawlog_fwd_stats' | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
if args.log: | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
logger = logging.getLogger('netmiko') | |
data_collector = { | |
'rawlog_fwd_stats': { | |
'cmd': 'debug log-receiver rawlog_fwd stats global show\n' | |
} | |
} | |
for key in data_collector: | |
command = data_collector[key]['cmd'] | |
output = get_cmd(device, args.username, args.pwd, command, key, logtimestamp, args.log) | |
if key == 'rawlog_fwd_stats': | |
data = output_parser(output, device, datenow) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import requests | |
import xml.etree.ElementTree as ET | |
import logging | |
import sys | |
import json | |
import datetime | |
import re | |
import socket | |
def get_api(device, params, log=False): | |
# script_path = os.path.dirname(__file__) | |
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
# CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 20 | |
url: str = 'https://' + device + '/api/' | |
if log: | |
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd']) | |
try: | |
# response = requests.post(url, timeout=request_timeout, verify=CA_path) | |
response = requests.post(url, params=params, timeout=request_timeout, verify=False) | |
except requests.exceptions.RequestException as e: | |
if log: | |
logging.error('We run in that problem: %s', e) | |
raise SystemExit(e) | |
if response.status_code != 200: | |
if log: | |
logging.error('with the url: %s', response.url) | |
logging.error('Cannot access the website, status code: %s', response.status_code) | |
logging.error('reply from the website:\n%s\n', response.text) | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
else: | |
if log: | |
logging.info('response from website:\n\n%s\n', response.text) | |
return response.text | |
def shortname(name): | |
sname = [] | |
name = name.replace('(','').replace(')','') | |
if ' ' in name: | |
for i in name.split(' '): | |
sname.append(i[0]) | |
elif '-' in name: | |
for i in name.split('-'): | |
sname.append(i[0]) | |
else: | |
sname.append(name[:3]) | |
return ''.join(sname) | |
def output_parser(device, output, xpath, timestamp): | |
match = False | |
log_types = ['traffic', 'threat', 'hipmatch', 'gtp', 'auth', 'iptag', 'userid', 'sctp', 'sctp', 'config', 'system', 'globalprotect' ] | |
data = [] | |
dict = {} | |
root = ET.fromstring(output) | |
for i in xpath: | |
for entry in root.findall(i): | |
if entry.tag == 'entry': | |
dict['lcname'] = entry.attrib['name'] | |
for child in entry: | |
if child.tag == 'conn-status' and 'Inactive' not in child.text: | |
match = True | |
elif child.tag == 'Rate': | |
# Example: "Rate : 0 logs/sec\n" | |
mrate = re.match(r"Rate\s+:\s+(?P<rate>\S+) logs.*", child.text) | |
if mrate is not None: | |
rate = int(mrate.group('rate')) | |
elif child.tag in log_types: | |
text = child.text.strip() | |
m = re.match(r"\S+\s+\d{4}\/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\s+\d{4}\/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\s+(?P<Last_Seq_Num_Fwded>\S+)\s+(?P<Last_Seq_Num_Acked>\S+)\s+(?P<total_logs_fwded>\S+)", text) | |
if m is not None: | |
logtype = child.tag | |
logcount = int(m.group('total_logs_fwded')) | |
dict[logtype] = logcount | |
if match: | |
dict['rate'] = rate | |
dict['timestamp'] = timestamp | |
dict['device'] = device | |
data.append(dict) | |
dict = {} | |
match = False | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect session count per vsys.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'logging-status' | |
data_collector = { | |
'logging-status': { | |
'cmd': '<show><logging-status><verbose>yes</verbose></logging-status></show>', | |
'xpath': ['./result/show-logging-status/Conn-Info/'] | |
} | |
} | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
if args.log: | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
for key in data_collector: | |
cmd = data_collector[key]['cmd'] | |
xpath = data_collector[key]['xpath'] | |
params = { | |
'type': 'op', | |
'key': args.pwd, | |
'cmd': cmd | |
} | |
xml_response: str = get_api(device, params, args.log) | |
if key == 'logging-status': | |
data = output_parser(device, xml_response, xpath, datenow) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from netmiko import ConnectHandler | |
import sys | |
import re | |
import json | |
import datetime | |
import logging | |
import socket | |
def get_cmd(device, username, pwd, command, key, timestamp, log=False): | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_netmiko.log' | |
#netmiko input dictionary | |
paloalto = { | |
'device_type': 'paloalto_panos', | |
'ip': device, | |
'username': username, | |
'password': pwd, | |
'session_log': filename, | |
} | |
ssh = ConnectHandler(**paloalto) | |
pageroffcmd = 'set cli pager off\n' | |
ssh.send_command(pageroffcmd, expect_string=r">") | |
output = ssh.send_command(command, expect_string=r">") | |
return output | |
def output_parser(output, device, timestamp): | |
data = [] | |
dict = {} | |
for line in output.splitlines(): | |
m = re.match(r"DP\s+(?P<dpid>\S+):", line) | |
if m is not None: | |
dpid = m.group('dpid') | |
elif 'Hardware Pools' in line: | |
mtype = 'hw_pool' | |
elif 'Software Pools' in line: | |
mtype = 'sw_pool' | |
else: | |
# hw pool lines | |
m = re.match(r"\[\d+\]\s+(?P<poolname>.*)\s+:\s+(?P<poolavail>\S+)/(?P<poolsize>\S+)\s+.*", line) | |
if m is not None: | |
dict['name'] = m.group('poolname').strip().replace(' ','_') | |
dict['pa'] = int(m.group('poolavail')) | |
dict['ps'] = int(m.group('poolsize')) | |
#pool usage in percent | |
dict['pu'] = round(100 - (int(m.group('poolavail')) * 100 / int(m.group('poolsize'))), 0) | |
dict['timestamp'] = timestamp | |
dict['dpid'] = dpid | |
dict['type'] = mtype | |
dict['device'] = device | |
data.append(dict) | |
dict = {} | |
else: | |
# sw pool lines | |
m = re.match(r"\[.*\]\s+(?P<poolname>.*)\(.*\):\s+(?P<poolavail>\S+)\/(?P<poolsize>\S+)\s+.*", line) | |
if m is not None: | |
dict['name'] = m.group('poolname').strip().replace(' ','_') | |
dict['pa'] = int(m.group('poolavail')) | |
dict['ps'] = int(m.group('poolsize')) | |
#pool usage in percent | |
dict['pu'] = round(100 - (int(m.group('poolavail')) * 100 / int(m.group('poolsize'))), 0) | |
dict['timestamp'] = timestamp | |
if 'dpid' not in locals(): | |
dpid = 'NA' | |
dict['dpid'] = dpid | |
dict['type'] = mtype | |
dict['device'] = device | |
data.append(dict) | |
dict = {} | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-u', '--username', | |
dest = 'username', | |
help = 'username for the logon on Palo Alto Firewall.', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'dataplane_pool_statistics' | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
if args.log: | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
logger = logging.getLogger('netmiko') | |
data_collector = { | |
'dataplane_pool_statistics': { | |
'cmd': 'debug dataplane pool statistics\n' | |
} | |
} | |
for key in data_collector: | |
command = data_collector[key]['cmd'] | |
output = get_cmd(device, args.username, args.pwd, command, key, logtimestamp, args.log) | |
if key == 'dataplane_pool_statistics': | |
data = output_parser(output, device, datenow) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from netmiko import ConnectHandler | |
import sys | |
import re | |
import json | |
import datetime | |
import logging | |
import socket | |
def get_cmd(device, username, pwd, command, key, timestamp, log=False): | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_netmiko.log' | |
#netmiko input dictionary | |
paloalto = { | |
'device_type': 'paloalto_panos', | |
'ip': device, | |
'username': username, | |
'password': pwd, | |
'session_log': filename, | |
} | |
ssh = ConnectHandler(**paloalto) | |
pageroffcmd = 'set cli pager off\n' | |
ssh.send_command(pageroffcmd, expect_string=r">") | |
output = ssh.send_command(command, expect_string=r">") | |
return output | |
def output_parser(output, device, timestamp): | |
data = [] | |
dict = {} | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
for line in output.splitlines(): | |
m = re.match(r"DP\s+(?P<dpid>\S+):", line) | |
if m is not None: | |
dpid = m.group('dpid') | |
else: | |
m = re.match(r"(?P<type>\S+)\s+max-us\s+.*", line) | |
if m is not None: | |
mtype = m.group('type') | |
else: | |
m = re.match(r"(?P<name>\S+)\s+(?P<maxus>\S+)\s+(?P<avgus>\S+)\s+(?P<count>\S+)\s+(?P<totalus>\S+)\s+(?P<acmaxus>\S+)\s+(?P<acavgus>\S+)\s+(?P<account>\S+)\s+(?P<actotalus>\S+)", line) | |
if m is not None and m.group('maxus') != 'max-us' and m.group('maxus') != '0': | |
dict['name'] = m.group('name') | |
dict['maxus'] = int(m.group('maxus')) | |
dict['avgus'] = float(m.group('avgus')) | |
dict['count'] = int(m.group('count')) | |
dict['totalus'] = int(m.group('totalus')) | |
dict['acmaxus'] = int(m.group('acmaxus')) | |
dict['acavgus'] = float(m.group('acavgus')) | |
dict['account'] = int(m.group('account')) | |
dict['timestamp'] = timestamp | |
dict['device'] = device | |
if 'dpid' not in locals(): | |
dpid = 'NA' | |
dict['dpid'] = dpid | |
dict['type'] = mtype | |
data.append(dict) | |
dict = {} | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-u', '--username', | |
dest = 'username', | |
help = 'username for the logon on Palo Alto Firewall.', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'dataplane_pow_performance' | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
if args.log: | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
logger = logging.getLogger('netmiko') | |
data_collector = { | |
'dataplane_pow_performance': { | |
'cmd': 'debug dataplane pow performance all\n' | |
} | |
} | |
for key in data_collector: | |
command = data_collector[key]['cmd'] | |
output = get_cmd(device, args.username, args.pwd, command, key, logtimestamp, args.log) | |
if key == 'dataplane_pow_performance': | |
data = output_parser(output, device, datenow) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import requests | |
import xml.etree.ElementTree as ET | |
import logging | |
import sys | |
import json | |
import datetime | |
import re | |
import socket | |
def get_api(device, params, log=False): | |
# script_path = os.path.dirname(__file__) | |
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
# CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 20 | |
url: str = 'https://' + device + '/api/' | |
if log: | |
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd']) | |
try: | |
# response = requests.post(url, timeout=request_timeout, verify=CA_path) | |
response = requests.post(url, params=params, timeout=request_timeout, verify=False) | |
except requests.exceptions.RequestException as e: | |
if log: | |
logging.error('We run in that problem: %s', e) | |
raise SystemExit(e) | |
if response.status_code != 200: | |
if log: | |
logging.error('with the url: %s', response.url) | |
logging.error('Cannot access the website, status code: %s', response.status_code) | |
logging.error('reply from the website:\n%s\n', response.text) | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
else: | |
if log: | |
logging.info('response from website:\n\n%s\n', response.text) | |
return response.text | |
def shortname(name): | |
sname = [] | |
name = name.replace('(','').replace(')','') | |
if ' ' in name: | |
for i in name.split(' '): | |
sname.append(i[0]) | |
else: | |
for i in name.split('-'): | |
sname.append(i[0]) | |
return ''.join(sname) | |
def output_parser(device, output, xpath, pertime): | |
timestamp_orig = datetime.datetime.now() | |
timestamp = timestamp_orig | |
if 'second' in pertime: | |
delta = 1 | |
elif 'minute' in pertime: | |
delta = 60 | |
elif 'hour' in pertime: | |
delta = 3600 | |
elif 'day' in pertime: | |
delta = 86400 | |
elif 'week' in pertime: | |
delta = 604800 | |
ddelta = delta | |
data = [] | |
dict = {} | |
root = ET.fromstring(output) | |
for entry in root.findall(xpath): | |
dpid = entry.tag | |
for child in entry[0]: | |
name = shortname(child.tag) | |
if 'cpu' in child.tag: | |
for subchild in child: | |
cid = subchild.find('./coreid').text | |
vlist = subchild.find('./value').text | |
for value in vlist.split(','): | |
dict['dpid'] = dpid | |
dict['type'] = name | |
dict['name'] = 'core' + cid | |
dict['value'] = int(value) | |
dict['device'] = device | |
dict['timestamp'] = timestamp.strftime("%Y-%m-%d %H:%M:%S") | |
data.append(dict) | |
timestamp = (timestamp_orig - datetime.timedelta(seconds=ddelta)) | |
ddelta = ddelta + delta | |
dict = {} | |
timestamp = timestamp_orig | |
ddelta = delta | |
if 'resource' in child.tag: | |
for subchild in child: | |
data_name = shortname(subchild.find('./name').text) | |
vlist = subchild.find('./value').text | |
for value in vlist.split(','): | |
dict['dpid'] = dpid | |
dict['type'] = name | |
dict['name'] = data_name | |
dict['value'] = int(value) | |
dict['device'] = device | |
dict['timestamp'] = timestamp.strftime("%Y-%m-%d %H:%M:%S") | |
data.append(dict) | |
timestamp = (timestamp_orig - datetime.timedelta(seconds=ddelta)) | |
ddelta = ddelta + delta | |
dict = {} | |
timestamp = timestamp_orig | |
ddelta = delta | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect resource-monitor data.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
key = 'resource-monitor' | |
value = '5' | |
per_time = 'minute' | |
data_collector = { | |
'resource-monitor': { | |
'cmd': '<show><running><resource-monitor><' + per_time + '><last>' + value + '</last></' + per_time + '></resource-monitor></running></show>', | |
'xpath': './result/resource-monitor/data-processors/' | |
} | |
} | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
if args.log: | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
for key in data_collector: | |
cmd = data_collector[key]['cmd'] | |
xpath = data_collector[key]['xpath'] | |
params = { | |
'type': 'op', | |
'key': args.pwd, | |
'cmd': cmd | |
} | |
xml_response: str = get_api(device, params, args.log) | |
if key == 'resource-monitor': | |
data = output_parser(device, xml_response, xpath, per_time) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import requests | |
import xml.etree.ElementTree as ET | |
import logging | |
import sys | |
import json | |
import datetime | |
import re | |
import socket | |
def get_api(device, params, log=False): | |
# script_path = os.path.dirname(__file__) | |
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
# CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 20 | |
url: str = 'https://' + device + '/api/' | |
if log: | |
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd']) | |
try: | |
# response = requests.post(url, timeout=request_timeout, verify=CA_path) | |
response = requests.post(url, params=params, timeout=request_timeout, verify=False) | |
except requests.exceptions.RequestException as e: | |
if log: | |
logging.error('We run in that problem: %s', e) | |
raise SystemExit(e) | |
if response.status_code != 200: | |
if log: | |
logging.error('with the url: %s', response.url) | |
logging.error('Cannot access the website, status code: %s', response.status_code) | |
logging.error('reply from the website:\n%s\n', response.text) | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
else: | |
if log: | |
logging.info('response from website:\n\n%s\n', response.text) | |
return response.text | |
def shortname(name): | |
sname = [] | |
name = name.replace('(','').replace(')','') | |
if ' ' in name: | |
for i in name.split(' '): | |
sname.append(i[0]) | |
elif '-' in name: | |
for i in name.split('-'): | |
sname.append(i[0]) | |
else: | |
sname.append(name[:3]) | |
return ''.join(sname) | |
def output_parser(device, output, xpath, timestamp): | |
data = [] | |
dict = {} | |
root = ET.fromstring(output) | |
for i in xpath: | |
for entry in root.findall(i): | |
dict['dpid'] = entry.tag | |
for child in entry: | |
dict[shortname(child.tag)] = float(child.text) | |
dict['timestamp'] = timestamp | |
dict['device'] = device | |
data.append(dict) | |
dict = {} | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect session distribution on .',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'session-distribution' | |
data_collector = { | |
'session-distribution': { | |
'cmd': '<show><session><distribution><statistics></statistics></distribution></session></show>', | |
'xpath': ['./result/'] | |
} | |
} | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
if args.log: | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
for key in data_collector: | |
cmd = data_collector[key]['cmd'] | |
xpath = data_collector[key]['xpath'] | |
params = { | |
'type': 'op', | |
'key': args.pwd, | |
'cmd': cmd | |
} | |
xml_response: str = get_api(device, params, args.log) | |
if key == 'session-distribution': | |
data = output_parser(device, xml_response, xpath, datenow) | |
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import requests | |
import xml.etree.ElementTree as ET | |
import logging | |
import sys | |
import json | |
import datetime | |
import re | |
import socket | |
def get_api(device, params, log=False): | |
# script_path = os.path.dirname(__file__) | |
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
# CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 20 | |
url: str = 'https://' + device + '/api/' | |
if log: | |
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd']) | |
try: | |
# response = requests.post(url, timeout=request_timeout, verify=CA_path) | |
response = requests.post(url, params=params, timeout=request_timeout, verify=False) | |
except requests.exceptions.RequestException as e: | |
if log: | |
logging.error('We run in that problem: %s', e) | |
raise SystemExit(e) | |
if response.status_code != 200: | |
if log: | |
logging.error('with the url: %s', response.url) | |
logging.error('Cannot access the website, status code: %s', response.status_code) | |
logging.error('reply from the website:\n%s\n', response.text) | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
else: | |
if log: | |
logging.info('response from website:\n\n%s\n', response.text) | |
return response.text | |
def shortname(name): | |
sname = [] | |
name = name.replace('(','').replace(')','') | |
if ' ' in name: | |
for i in name.split(' '): | |
sname.append(i[0]) | |
elif '-' in name: | |
for i in name.split('-'): | |
sname.append(i[0]) | |
else: | |
sname.append(name[:3]) | |
return ''.join(sname) | |
def output_parser(device, output, xpath, timestamp): | |
data = [] | |
dict = {} | |
root = ET.fromstring(output) | |
for i in xpath: | |
for entry in root.findall(i): | |
for child in entry: | |
if child.tag in ['vsys']: | |
dict[child.tag] = child.text | |
elif child.text is not None: | |
dict[shortname(child.tag)] = int(child.text) | |
dict['timestamp'] = timestamp | |
dict['device'] = device | |
data.append(dict) | |
dict = {} | |
return data | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect session count per vsys.',epilog="And that's how you use rest api...") | |
parser.add_argument('-l', '--log', | |
action = argparse.BooleanOptionalAction, | |
default = False, | |
dest = 'log', | |
help = 'switch logging on or off') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument('-d', '--device', | |
dest = 'device', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password hash for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
key = 'session-meter' | |
data_collector = { | |
'session-meter': { | |
'cmd': '<show><session><meter></meter></session></show>', | |
'xpath': ['./result/'] | |
} | |
} | |
if host_to_dns(args.device) == "not resolvable": | |
exit | |
else: | |
device = host_to_dns(args.device) | |
if args.log: | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log' | |
logging.basicConfig(filename=filename, filemode='w', | |
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO) | |
for key in data_collector: | |
cmd = data_collector[key]['cmd'] | |
xpath = data_collector[key]['xpath'] | |
params = { | |
'type': 'op', | |
'key': args.pwd, | |
'cmd': cmd | |
} | |
xml_response: str = get_api(device, params, args.log) | |
if key == 'session-meter': | |
data = output_parser(device, xml_response, xpath, datenow) | |
with open('D:\\User data\\dkx8blr\\dev\\' + device + '_' + key + '.json', 'a') as fp: | |
json.dump(data,fp) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from netmiko import ConnectHandler | |
import sys | |
def main(argv): | |
host = '10.33.245.211' | |
username = '....' | |
pwd = '....' | |
paloalto = { | |
'device_type': 'paloalto_panos', | |
'ip': host, | |
'username': username, | |
'password': pwd, | |
'session_log': 'C:\\Users\\admin\\Downloads\\output.txt', | |
} | |
ssh = ConnectHandler(**paloalto) | |
cmd1 = 'set cli pager off' | |
output1 = ssh.send_command(cmd1) | |
cmd2 = 'show system info' | |
output2 = ssh.send_command(cmd2) | |
print(output2) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# Author: Ist wurst... | |
# get key first: | |
# https://pa-panorama/api/?type=keygen&user=de-netmgmt&password=<ASKFORIT> | |
# | |
# https://IPADDRESS/api/?type=config&action=set&xpath=/config/shared/reports/entry[@name='TrafSumCustTime']&element=<type><trsum><sortby>bytes</sortby><group-by>src</group-by><aggregate-by><member>from</member><member>app</member></aggregate-by><values><member>bytes</member><member>bytes_sent</member><member>bytes_received</member></values></trsum></type><topn>10</topn><topm>5</topm><caption>TrafSumCustTime</caption><start-time>DATE+TIMESTAMP</start-time><end-time>DATE+TIMESTAMP</end-time>&key=KEY | |
import requests | |
import argparse | |
import json | |
import logging | |
import ssl | |
import sys, getopt | |
import time | |
import xml.etree.ElementTree as ET | |
import datetime | |
import pdb | |
def get_api(host, params, log=False): | |
# script_path = os.path.dirname(__file__) | |
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
# CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 20 | |
url: str = 'https://' + host + '/api/' | |
if log: | |
logging.info('Trying to access host: %s with cmd: %s', host, params['cmd']) | |
try: | |
# response = requests.post(url, timeout=request_timeout, verify=CA_path) | |
response = requests.post(url, params=params, timeout=request_timeout, verify=False) | |
except requests.exceptions.RequestException as e: | |
if log: | |
logging.error('We run in that problem: %s', e) | |
raise SystemExit(e) | |
if response.status_code != 200: | |
if log: | |
logging.error('Cannot access the website, status code: %s', response.status_code) | |
logging.error('reply from the website:\n%s\n', response.text) | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
else: | |
if log: | |
logging.info('response from website:\n\n%s\n', response.text) | |
return response.text | |
def get_report( host, hash, reportname, report, starttime, endtime, log=False ): | |
#reportstarturl = '/api/?type=report&async=yes&reporttype=custom&reportname=' | |
#reportgeturl = '/api/?type=report&action=get&job-id=' | |
argssetrep = { | |
'type':'config', | |
'action':'set', | |
'xpath':'/config/shared/reports/entry[@name=\''+reportname+'\']', | |
'element':report, | |
'key':hash | |
} | |
argsinitrep = { | |
'type':'report', | |
'async':'yes', | |
'key':hash, | |
'reporttype':'custom', | |
'reportname':reportname | |
} | |
argsgetrep = { | |
'type':'report', | |
'action':'get', | |
'job-id':'1', | |
'key':hash | |
} | |
#1. create the report | |
xml_response = get_api(host, argssetrep, log) | |
if log: | |
logging.info("api xml output Original: \n" + xml_response.decode("utf-8") ) | |
#2. start the report | |
xml_response = get_api(host, argsinitrep, log) | |
if log: | |
logging.info("api xml output Original: \n" + xml_response.decode("utf-8") ) | |
if xml_response: | |
time.sleep(7) | |
#pdb.set_trace() | |
root = ET.fromstring(xml_response) | |
for entry in root.findall('./result/msg/line'): | |
attribval = entry.text | |
if attribval.find('jobid') != -1: | |
jobid = attribval.split()[-1] | |
argsgetrep['job-id'] = jobid | |
#3. loop till the report is ready. The status will be change from ACT to FIN. | |
repstatus = True | |
while repstatus: | |
#pdb.set_trace() | |
xml_response2 = get_api(host, argsgetrep, log) | |
xmlrep = xml_response2.decode("utf-8") | |
rootreport = ET.fromstring(xmlrep) | |
xpath = './result/job/status' | |
if rootreport.find(xpath).text == "FIN": | |
repstatus = False | |
output = '' | |
for entry2 in rootreport.findall('./result/report/entry'): | |
#ET.dump(entry2) | |
dateelement = ET.Element("date") | |
dateelement.text = starttime | |
entry2.insert(0,dateelement) | |
str = ET.tostring(entry2).decode() | |
str = str.replace('\r', '').replace('\n', '') | |
output += str.strip() + "\n" | |
else: | |
time.sleep(10) | |
else: | |
response = "I cannot find the jobid for the report. Contact the Firewall Administrator!" | |
sys.exit() | |
return output | |
else: | |
raise Exception("The root cause of the problem is there is no jobid or something...") | |
def main(argv): | |
#1. check the arguments | |
today = datetime.date.today() | |
todaystr = today.strftime('%Y/%m/%d') + ' 00:05:00' | |
yesterday = today - datetime.timedelta(days = 1) | |
yesterdaystr = yesterday.strftime('%Y/%m/%d') + ' 00:05:00' | |
parser = argparse.ArgumentParser(description='Palo Alto Report Collector .',epilog="And that's how you collect custom reports to ELK...") | |
parser.add_argument('-v','--version', | |
action = 'version', version='%(prog)s 1.0') | |
parser.add_argument('-l','--log', | |
action='store_true', | |
default=False, | |
help='switch logging on', | |
dest='log') | |
parser.add_argument('-p','--panorama', | |
help='IP or hostname of the Palo Alto Panorama', | |
dest='panorama') | |
parser.add_argument('-k','--hashkey', | |
help='Password hash for the logon on Palo Alto Panorama', | |
dest='hashkey') | |
parser.add_argument('-n','--reportname', | |
help='The Name of the report in Panorama', | |
dest='reportname') | |
parser.add_argument('-r','--report', | |
help='palo alto xml report', | |
dest='report') | |
parser.add_argument('-s','--starttime', | |
nargs='?', | |
help='start time for the report time selection. default is yesterday.', | |
dest='starttime', | |
default=yesterdaystr) | |
parser.add_argument('-e','--endtime', | |
nargs='?', | |
help='end time for the report time selection. default is today.', | |
dest='endtime', | |
default=todaystr) | |
args = parser.parse_args() | |
# logging | |
if args.log: | |
logging.basicConfig(level=logging.INFO, | |
filename='prtg-info.log', # log to this file | |
format='%(asctime)s %(message)s') # include timestamp | |
logging.info("Start Logging...") | |
ipnet = '10.34.160.0/19' | |
report_query = '(addr.dst in \'{ipnetwork}\') and (action eq allow)'.format(ipnetwork=ipnet) | |
report = '''\ | |
<type> | |
<panorama-trsum> | |
<sortby>bytes</sortby> | |
<aggregate-by> | |
<member>app</member> | |
<member>category-of-app</member> | |
<member>subcategory-of-app</member> | |
<member>technology-of-app</member> | |
<member>container-of-app</member> | |
</aggregate-by> | |
<values> | |
<member>bytes_received</member> | |
<member>bytes_sent</member> | |
<member>bytes</member> | |
<member>sessions</member> | |
</values> | |
</panorama-trsum> | |
</type> | |
<period>last-7-days</period> | |
<topn>5</topn> | |
<topm>10</topm> | |
<caption>net-report</caption> | |
<query>{rep_query}</query>\ | |
'''.format(rep_query=report_query) | |
report.replace('\n','') | |
rpt = get_report(args.panorama, args.hashkey, args.reportname, report, args.starttime, args.endtime, args.log) | |
file = open(args.reportname,"w") | |
file.write(rpt) | |
file.close() | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import sys | |
import os | |
import requests | |
from requests.utils import requote_uri | |
import json | |
import argparse | |
import datetime | |
import time | |
import pdb | |
import xml.etree.ElementTree as ET | |
import re | |
from netmiko import ConnectHandler | |
import socket | |
def get_cmd(host, username, pwd, cmd, log=False): | |
paloalto = { | |
'device_type': 'paloalto_panos', | |
'ip': host, | |
'username': username, | |
'password': pwd, | |
'session_log': 'output.txt', | |
} | |
ssh = ConnectHandler(**paloalto) | |
output = ssh.send_command(cmd) | |
return output | |
def json_stream(output, elasticindex, device): | |
timestamp = datetime.datetime.now() | |
issuedate = timestamp.strftime('%Y-%m-%d %H:%M:%S') | |
tsa_dict = {} | |
list = '\n' | |
elasticdict = { | |
'index' : { | |
'_index' : elasticindex, | |
'_type' : '_doc' | |
} | |
} | |
for line in output.splitlines(): | |
if len(line.strip()) > 4 and "---" not in line and "Blocks" not in line: | |
values = line.split() | |
tsa_dict["issuedate"] = issuedate | |
tsa_dict["device"] = device | |
tsa_dict["servername"] = values[0] | |
tsa_dict["hostname"] = values[1] | |
tsa_dict["serverport"] = values[2] | |
tsa_dict["vsysname"] = values[3] | |
tsa_dict["serverstatus"] = values[4] | |
tsa_dict["serverversion"] = values[5] | |
tsa_dict["users"] = values[6] | |
tsa_dict["blocks"] = values[6] | |
# convert to stream of jsons with new line separation | |
list += json.dumps(elasticdict) + "\n" | |
list += json.dumps(tsa_dict) + "\n" | |
tsa_dict = {} | |
return list | |
def uploader(url, jsdata, username, password, log=False): | |
headers = {'Content-Type': 'application/json'} | |
#script_path = os.path.dirname(__file__) | |
#CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem" | |
#CA_path = os.path.join(script_path, CA_file) | |
request_timeout = 60 | |
#pdb.set_trace() | |
if log: | |
logging.info("URL: " + url) | |
try: | |
if type(jsdata) == dict: | |
#response = requests.post(url, json=jsdata, headers=headers, timeout=request_timeout) | |
response = requests.post(url, json=jsdata, headers=headers, auth=(username,password), timeout=request_timeout) | |
else: | |
#response = requests.post(url, data=jsdata, headers=headers, timeout=request_timeout) | |
response = requests.post(url, data=jsdata, headers=headers, auth=(username,password), timeout=request_timeout) | |
except requests.exceptions.RequestException as e: | |
raise SystemExit(e) | |
if response.status_code != 200: | |
raise SystemExit(str(response.status_code) + " - " + response.text) | |
return str(response.status_code) + " - " + response.text | |
def validate_ip_address(address): | |
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address) | |
if bool(match) is False: | |
#print("IP address {} is not valid".format(address)) | |
return False | |
for part in address.split("."): | |
if int(part) < 0 or int(part) > 255: | |
print("IP address {} is not valid".format(address)) | |
return False | |
#print("IP address {} is valid".format(address)) | |
return True | |
def host_to_dns(device): | |
if validate_ip_address(device): | |
try: | |
dnsname = socket.gethostbyaddr(device)[0] | |
return dnsname | |
except socket.error: | |
return "not resolvable" | |
else: | |
try: | |
socket.gethostbyname(device) | |
return device | |
except socket.error: | |
return "not resolvable" | |
def main(argv): | |
parser = argparse.ArgumentParser(description='Collect TSA statistics.',epilog="And that's how you use rest api...") | |
parser.add_argument('--log', | |
action = 'store_true', | |
default = False, | |
dest = 'log_switch', | |
help = 'switch logging on') | |
parser.add_argument('-V','--version', | |
action = 'version', | |
version = '%(prog)s 1.0') | |
parser.add_argument("-v", "--verbosity", | |
action = "count", | |
default = 0, | |
help = 'increase output verbosity', | |
required = False) | |
parser.add_argument('-d', '--device', | |
dest = 'host', | |
help = 'IP or hostname of the Palo Alto Firewall', | |
required = True) | |
parser.add_argument('-u', '--username', | |
dest = 'username', | |
help = 'username for the logon on Palo Alto Firewall.', | |
required = True) | |
parser.add_argument('-p', '--pwd', | |
dest = 'pwd', | |
help = 'Password for the logon on Palo Alto Firewall.', | |
required = True) | |
args = parser.parse_args() | |
command = 'show user ts-agent statistics' | |
elastic_url = "https://elasticsearch-test:443" | |
elastic_index = "tsa-statistics" | |
username = '...' | |
password = '...' | |
#elastic_url = "http://10.251.58.65:9200" | |
#elastic_index = "tsa-agent-statistics" | |
#username = '...' | |
#password = '...' | |
# logging | |
if args.log_switch: | |
logging.basicConfig(level=logging.INFO, | |
filename='paloalto_tapstate_check.log', # log to this file | |
format='%(asctime)s %(message)s') # include timestamp | |
logging.info("Start Logging...") | |
#2. check if file woth host list or a simple host | |
if os.path.isfile(args.host): | |
hostlist = open(args.host, "r") | |
for host in hostlist: | |
host = host.strip() | |
if host_to_dns(host) == "not resolvable": | |
exit | |
else: | |
host = host_to_dns(host) | |
xml_response = get_cmd(host, args.username, args.pwd, command, args.log_switch) | |
if xml_response: | |
#upload the docs to elastic | |
jsdata = json_stream(xml_response, elastic_index, host) | |
url = elastic_url + "/_bulk?pretty" | |
uploader(url, jsdata, username, password, log=False) | |
else: | |
if host_to_dns(args.host) == "not resolvable": | |
exit | |
else: | |
host = host_to_dns(args.host) | |
xml_response = get_cmd(host, args.username, args.pwd, command, args.log_switch) | |
if xml_response: | |
#upload the docs to neptune | |
jsdata = json_stream(xml_response, elastic_index, host) | |
filename = "jsdata" + str(args.host) + "txt" | |
change_file1 = open(filename, "w") | |
change_file1.write(jsdata) | |
change_file1.close() | |
#with open("jsdata.txt", "r") as js_stream: | |
# jsdata = js_stream.read() | |
#pdb.set_trace() | |
url = elastic_url + "/_bulk?pretty" | |
uploader(url, jsdata, username, password, log=False) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment