Skip to content

Instantly share code, notes, and snippets.

@itsecworks
Last active October 5, 2023 07:44
Show Gist options
  • Save itsecworks/864cc17acd141c8313fcf38e343840f3 to your computer and use it in GitHub Desktop.
Save itsecworks/864cc17acd141c8313fcf38e343840f3 to your computer and use it in GitHub Desktop.
Palo Alto Monitoring scripts
import argparse
import requests
import xml.etree.ElementTree as ET
import logging
import sys
import json
import datetime
import re
import socket
def get_api(device, params, log=False):
# script_path = os.path.dirname(__file__)
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
# CA_path = os.path.join(script_path, CA_file)
request_timeout = 20
url: str = 'https://' + device + '/api/'
if log:
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd'])
try:
# response = requests.post(url, timeout=request_timeout, verify=CA_path)
response = requests.post(url, params=params, timeout=request_timeout, verify=False)
except requests.exceptions.RequestException as e:
if log:
logging.error('We run in that problem: %s', e)
raise SystemExit(e)
if response.status_code != 200:
if log:
logging.error('with the url: %s', response.url)
logging.error('Cannot access the website, status code: %s', response.status_code)
logging.error('reply from the website:\n%s\n', response.text)
raise SystemExit(str(response.status_code) + " - " + response.text)
else:
if log:
logging.info('response from website:\n\n%s\n', response.text)
return response.text
def output_parser(device, output, xpath, timestamp):
data = []
dict = {}
root = ET.fromstring(output)
for i in xpath:
for entry in root.findall(i):
for child in entry:
if len(child):
for subchild in child:
dict[subchild.tag] = int(subchild.text)
elif child.tag in ['name', 'interface']:
dict[child.tag] = child.text
elif child.text is not None:
dict[child.tag] = int(child.text)
dict['timestamp'] = timestamp
dict['device'] = device
data.append(dict)
dict = {}
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'counter_interface'
data_collector = {
'counter-interface': {
'cmd': '<show><counter><interface>all</interface></counter></show>',
'xpath': ['./result/ifnet/ifnet/', './result/hw/']
}
}
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
if args.log:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
for key in data_collector:
cmd = data_collector[key]['cmd']
xpath = data_collector[key]['xpath']
params = {
'type': 'op',
'key': args.pwd,
'cmd': cmd
}
xml_response: str = get_api(device, params, args.log)
if key == 'counter-interface':
data = output_parser(device, xml_response, xpath, datenow)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
import requests
import xml.etree.ElementTree as ET
import logging
import sys
import json
import datetime
import re
import socket
def get_api(device, params, log=False):
# script_path = os.path.dirname(__file__)
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
# CA_path = os.path.join(script_path, CA_file)
request_timeout = 20
url: str = 'https://' + device + '/api/'
if log:
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd'])
try:
# response = requests.post(url, timeout=request_timeout, verify=CA_path)
response = requests.post(url, params=params, timeout=request_timeout, verify=False)
except requests.exceptions.RequestException as e:
if log:
logging.error('We run in that problem: %s', e)
raise SystemExit(e)
if response.status_code != 200:
if log:
logging.error('with the url: %s', response.url)
logging.error('Cannot access the website, status code: %s', response.status_code)
logging.error('reply from the website:\n%s\n', response.text)
raise SystemExit(str(response.status_code) + " - " + response.text)
else:
if log:
logging.info('response from website:\n\n%s\n', response.text)
return response.text
def output_parser(device, output, xpath, timestamp):
gclist = []
gclines = ''
# fields:
# timestamp, device, category, name, value, rate, aspect, desc, id, severity
root = ET.fromstring(output)
for entry in root.findall(xpath):
gclist.append(str(timestamp))
gclist.append(device)
for child in entry:
if ' ' in child.text:
gclist.append("\"" + str(child.text) + "\"")
else:
gclist.append(str(child.text))
gclines += ','.join(gclist) + '\n'
gclist.clear()
return(gclines)
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'globalcounters'
data_collector = {
'globalcounters': {
'cmd': '<show><counter><global><filter><delta>yes</delta></filter></global></counter></show>',
'xpath': './result/global/counters/entry'
}
}
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
if args.log:
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
for key in data_collector:
cmd = data_collector[key]['cmd']
xpath = data_collector[key]['xpath']
params = {
'type': 'op',
'key': args.pwd,
'cmd': cmd
}
xml_response: str = get_api(device, params, args.log)
if key == 'globalcounters':
data = output_parser(device, xml_response, xpath, datenow)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.csv', 'a') as fp:
fp.writelines(data)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
from netmiko import ConnectHandler
import sys
import re
import json
import datetime
import logging
import socket
def get_cmd(device, username, pwd, command, key, timestamp, log=False):
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_netmiko.log'
#netmiko input dictionary
paloalto = {
'device_type': 'paloalto_panos',
'ip': device,
'username': username,
'password': pwd,
'session_log': filename,
}
ssh = ConnectHandler(**paloalto)
pageroffcmd = 'set cli pager off\n'
ssh.send_command(pageroffcmd, expect_string=r">")
output = ssh.send_command(command, expect_string=r">")
return output
def output_parser(output, device, timestamp):
data = []
dict = {}
maindict = {}
for line in output.splitlines():
if 'LFP' in line:
lfpid = line.strip().replace(':','')
elif 'rate' in line:
rate = int(line.split(':')[1].strip().split(' ')[0])
elif 'Logs' in line:
logstage = line.strip().replace(' ','_')
logstage = logstage.lower()
elif ':' in line:
key = line.split(':')[0].strip().replace(' ','_')
key = key.lower()
val = int(line.split(':')[1].strip())
if 'lfpid' not in locals():
lfpid = 'NA'
if lfpid not in dict:
dict[lfpid] = {}
if logstage not in dict[lfpid]:
dict[lfpid][logstage] = {}
dict[lfpid][logstage][key] = val
for key in dict:
for subkey in dict[key]:
maindict['lfpid'] = key
maindict['logstage'] = subkey
for ssubkey in dict[key][subkey]:
maindict[ssubkey] = dict[key][subkey][ssubkey]
maindict['timestamp'] = timestamp
maindict['device'] = device
data.append(maindict)
maindict = {}
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-u', '--username',
dest = 'username',
help = 'username for the logon on Palo Alto Firewall.',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'rawlog_fwd_stats'
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
if args.log:
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger('netmiko')
data_collector = {
'rawlog_fwd_stats': {
'cmd': 'debug log-receiver rawlog_fwd stats global show\n'
}
}
for key in data_collector:
command = data_collector[key]['cmd']
output = get_cmd(device, args.username, args.pwd, command, key, logtimestamp, args.log)
if key == 'rawlog_fwd_stats':
data = output_parser(output, device, datenow)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
import requests
import xml.etree.ElementTree as ET
import logging
import sys
import json
import datetime
import re
import socket
def get_api(device, params, log=False):
# script_path = os.path.dirname(__file__)
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
# CA_path = os.path.join(script_path, CA_file)
request_timeout = 20
url: str = 'https://' + device + '/api/'
if log:
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd'])
try:
# response = requests.post(url, timeout=request_timeout, verify=CA_path)
response = requests.post(url, params=params, timeout=request_timeout, verify=False)
except requests.exceptions.RequestException as e:
if log:
logging.error('We run in that problem: %s', e)
raise SystemExit(e)
if response.status_code != 200:
if log:
logging.error('with the url: %s', response.url)
logging.error('Cannot access the website, status code: %s', response.status_code)
logging.error('reply from the website:\n%s\n', response.text)
raise SystemExit(str(response.status_code) + " - " + response.text)
else:
if log:
logging.info('response from website:\n\n%s\n', response.text)
return response.text
def shortname(name):
sname = []
name = name.replace('(','').replace(')','')
if ' ' in name:
for i in name.split(' '):
sname.append(i[0])
elif '-' in name:
for i in name.split('-'):
sname.append(i[0])
else:
sname.append(name[:3])
return ''.join(sname)
def output_parser(device, output, xpath, timestamp):
match = False
log_types = ['traffic', 'threat', 'hipmatch', 'gtp', 'auth', 'iptag', 'userid', 'sctp', 'sctp', 'config', 'system', 'globalprotect' ]
data = []
dict = {}
root = ET.fromstring(output)
for i in xpath:
for entry in root.findall(i):
if entry.tag == 'entry':
dict['lcname'] = entry.attrib['name']
for child in entry:
if child.tag == 'conn-status' and 'Inactive' not in child.text:
match = True
elif child.tag == 'Rate':
# Example: "Rate : 0 logs/sec\n"
mrate = re.match(r"Rate\s+:\s+(?P<rate>\S+) logs.*", child.text)
if mrate is not None:
rate = int(mrate.group('rate'))
elif child.tag in log_types:
text = child.text.strip()
m = re.match(r"\S+\s+\d{4}\/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\s+\d{4}\/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\s+(?P<Last_Seq_Num_Fwded>\S+)\s+(?P<Last_Seq_Num_Acked>\S+)\s+(?P<total_logs_fwded>\S+)", text)
if m is not None:
logtype = child.tag
logcount = int(m.group('total_logs_fwded'))
dict[logtype] = logcount
if match:
dict['rate'] = rate
dict['timestamp'] = timestamp
dict['device'] = device
data.append(dict)
dict = {}
match = False
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect session count per vsys.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'logging-status'
data_collector = {
'logging-status': {
'cmd': '<show><logging-status><verbose>yes</verbose></logging-status></show>',
'xpath': ['./result/show-logging-status/Conn-Info/']
}
}
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
if args.log:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
for key in data_collector:
cmd = data_collector[key]['cmd']
xpath = data_collector[key]['xpath']
params = {
'type': 'op',
'key': args.pwd,
'cmd': cmd
}
xml_response: str = get_api(device, params, args.log)
if key == 'logging-status':
data = output_parser(device, xml_response, xpath, datenow)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
from netmiko import ConnectHandler
import sys
import re
import json
import datetime
import logging
import socket
def get_cmd(device, username, pwd, command, key, timestamp, log=False):
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_netmiko.log'
#netmiko input dictionary
paloalto = {
'device_type': 'paloalto_panos',
'ip': device,
'username': username,
'password': pwd,
'session_log': filename,
}
ssh = ConnectHandler(**paloalto)
pageroffcmd = 'set cli pager off\n'
ssh.send_command(pageroffcmd, expect_string=r">")
output = ssh.send_command(command, expect_string=r">")
return output
def output_parser(output, device, timestamp):
data = []
dict = {}
for line in output.splitlines():
m = re.match(r"DP\s+(?P<dpid>\S+):", line)
if m is not None:
dpid = m.group('dpid')
elif 'Hardware Pools' in line:
mtype = 'hw_pool'
elif 'Software Pools' in line:
mtype = 'sw_pool'
else:
# hw pool lines
m = re.match(r"\[\d+\]\s+(?P<poolname>.*)\s+:\s+(?P<poolavail>\S+)/(?P<poolsize>\S+)\s+.*", line)
if m is not None:
dict['name'] = m.group('poolname').strip().replace(' ','_')
dict['pa'] = int(m.group('poolavail'))
dict['ps'] = int(m.group('poolsize'))
#pool usage in percent
dict['pu'] = round(100 - (int(m.group('poolavail')) * 100 / int(m.group('poolsize'))), 0)
dict['timestamp'] = timestamp
dict['dpid'] = dpid
dict['type'] = mtype
dict['device'] = device
data.append(dict)
dict = {}
else:
# sw pool lines
m = re.match(r"\[.*\]\s+(?P<poolname>.*)\(.*\):\s+(?P<poolavail>\S+)\/(?P<poolsize>\S+)\s+.*", line)
if m is not None:
dict['name'] = m.group('poolname').strip().replace(' ','_')
dict['pa'] = int(m.group('poolavail'))
dict['ps'] = int(m.group('poolsize'))
#pool usage in percent
dict['pu'] = round(100 - (int(m.group('poolavail')) * 100 / int(m.group('poolsize'))), 0)
dict['timestamp'] = timestamp
if 'dpid' not in locals():
dpid = 'NA'
dict['dpid'] = dpid
dict['type'] = mtype
dict['device'] = device
data.append(dict)
dict = {}
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-u', '--username',
dest = 'username',
help = 'username for the logon on Palo Alto Firewall.',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'dataplane_pool_statistics'
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
if args.log:
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger('netmiko')
data_collector = {
'dataplane_pool_statistics': {
'cmd': 'debug dataplane pool statistics\n'
}
}
for key in data_collector:
command = data_collector[key]['cmd']
output = get_cmd(device, args.username, args.pwd, command, key, logtimestamp, args.log)
if key == 'dataplane_pool_statistics':
data = output_parser(output, device, datenow)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
from netmiko import ConnectHandler
import sys
import re
import json
import datetime
import logging
import socket
def get_cmd(device, username, pwd, command, key, timestamp, log=False):
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_netmiko.log'
#netmiko input dictionary
paloalto = {
'device_type': 'paloalto_panos',
'ip': device,
'username': username,
'password': pwd,
'session_log': filename,
}
ssh = ConnectHandler(**paloalto)
pageroffcmd = 'set cli pager off\n'
ssh.send_command(pageroffcmd, expect_string=r">")
output = ssh.send_command(command, expect_string=r">")
return output
def output_parser(output, device, timestamp):
data = []
dict = {}
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for line in output.splitlines():
m = re.match(r"DP\s+(?P<dpid>\S+):", line)
if m is not None:
dpid = m.group('dpid')
else:
m = re.match(r"(?P<type>\S+)\s+max-us\s+.*", line)
if m is not None:
mtype = m.group('type')
else:
m = re.match(r"(?P<name>\S+)\s+(?P<maxus>\S+)\s+(?P<avgus>\S+)\s+(?P<count>\S+)\s+(?P<totalus>\S+)\s+(?P<acmaxus>\S+)\s+(?P<acavgus>\S+)\s+(?P<account>\S+)\s+(?P<actotalus>\S+)", line)
if m is not None and m.group('maxus') != 'max-us' and m.group('maxus') != '0':
dict['name'] = m.group('name')
dict['maxus'] = int(m.group('maxus'))
dict['avgus'] = float(m.group('avgus'))
dict['count'] = int(m.group('count'))
dict['totalus'] = int(m.group('totalus'))
dict['acmaxus'] = int(m.group('acmaxus'))
dict['acavgus'] = float(m.group('acavgus'))
dict['account'] = int(m.group('account'))
dict['timestamp'] = timestamp
dict['device'] = device
if 'dpid' not in locals():
dpid = 'NA'
dict['dpid'] = dpid
dict['type'] = mtype
data.append(dict)
dict = {}
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect counter for interfaces.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-u', '--username',
dest = 'username',
help = 'username for the logon on Palo Alto Firewall.',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'dataplane_pow_performance'
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
logtimestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
if args.log:
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + logtimestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger('netmiko')
data_collector = {
'dataplane_pow_performance': {
'cmd': 'debug dataplane pow performance all\n'
}
}
for key in data_collector:
command = data_collector[key]['cmd']
output = get_cmd(device, args.username, args.pwd, command, key, logtimestamp, args.log)
if key == 'dataplane_pow_performance':
data = output_parser(output, device, datenow)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
import requests
import xml.etree.ElementTree as ET
import logging
import sys
import json
import datetime
import re
import socket
def get_api(device, params, log=False):
# script_path = os.path.dirname(__file__)
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
# CA_path = os.path.join(script_path, CA_file)
request_timeout = 20
url: str = 'https://' + device + '/api/'
if log:
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd'])
try:
# response = requests.post(url, timeout=request_timeout, verify=CA_path)
response = requests.post(url, params=params, timeout=request_timeout, verify=False)
except requests.exceptions.RequestException as e:
if log:
logging.error('We run in that problem: %s', e)
raise SystemExit(e)
if response.status_code != 200:
if log:
logging.error('with the url: %s', response.url)
logging.error('Cannot access the website, status code: %s', response.status_code)
logging.error('reply from the website:\n%s\n', response.text)
raise SystemExit(str(response.status_code) + " - " + response.text)
else:
if log:
logging.info('response from website:\n\n%s\n', response.text)
return response.text
def shortname(name):
sname = []
name = name.replace('(','').replace(')','')
if ' ' in name:
for i in name.split(' '):
sname.append(i[0])
else:
for i in name.split('-'):
sname.append(i[0])
return ''.join(sname)
def output_parser(device, output, xpath, pertime):
timestamp_orig = datetime.datetime.now()
timestamp = timestamp_orig
if 'second' in pertime:
delta = 1
elif 'minute' in pertime:
delta = 60
elif 'hour' in pertime:
delta = 3600
elif 'day' in pertime:
delta = 86400
elif 'week' in pertime:
delta = 604800
ddelta = delta
data = []
dict = {}
root = ET.fromstring(output)
for entry in root.findall(xpath):
dpid = entry.tag
for child in entry[0]:
name = shortname(child.tag)
if 'cpu' in child.tag:
for subchild in child:
cid = subchild.find('./coreid').text
vlist = subchild.find('./value').text
for value in vlist.split(','):
dict['dpid'] = dpid
dict['type'] = name
dict['name'] = 'core' + cid
dict['value'] = int(value)
dict['device'] = device
dict['timestamp'] = timestamp.strftime("%Y-%m-%d %H:%M:%S")
data.append(dict)
timestamp = (timestamp_orig - datetime.timedelta(seconds=ddelta))
ddelta = ddelta + delta
dict = {}
timestamp = timestamp_orig
ddelta = delta
if 'resource' in child.tag:
for subchild in child:
data_name = shortname(subchild.find('./name').text)
vlist = subchild.find('./value').text
for value in vlist.split(','):
dict['dpid'] = dpid
dict['type'] = name
dict['name'] = data_name
dict['value'] = int(value)
dict['device'] = device
dict['timestamp'] = timestamp.strftime("%Y-%m-%d %H:%M:%S")
data.append(dict)
timestamp = (timestamp_orig - datetime.timedelta(seconds=ddelta))
ddelta = ddelta + delta
dict = {}
timestamp = timestamp_orig
ddelta = delta
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect resource-monitor data.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
key = 'resource-monitor'
value = '5'
per_time = 'minute'
data_collector = {
'resource-monitor': {
'cmd': '<show><running><resource-monitor><' + per_time + '><last>' + value + '</last></' + per_time + '></resource-monitor></running></show>',
'xpath': './result/resource-monitor/data-processors/'
}
}
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
if args.log:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
for key in data_collector:
cmd = data_collector[key]['cmd']
xpath = data_collector[key]['xpath']
params = {
'type': 'op',
'key': args.pwd,
'cmd': cmd
}
xml_response: str = get_api(device, params, args.log)
if key == 'resource-monitor':
data = output_parser(device, xml_response, xpath, per_time)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
import requests
import xml.etree.ElementTree as ET
import logging
import sys
import json
import datetime
import re
import socket
def get_api(device, params, log=False):
# script_path = os.path.dirname(__file__)
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
# CA_path = os.path.join(script_path, CA_file)
request_timeout = 20
url: str = 'https://' + device + '/api/'
if log:
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd'])
try:
# response = requests.post(url, timeout=request_timeout, verify=CA_path)
response = requests.post(url, params=params, timeout=request_timeout, verify=False)
except requests.exceptions.RequestException as e:
if log:
logging.error('We run in that problem: %s', e)
raise SystemExit(e)
if response.status_code != 200:
if log:
logging.error('with the url: %s', response.url)
logging.error('Cannot access the website, status code: %s', response.status_code)
logging.error('reply from the website:\n%s\n', response.text)
raise SystemExit(str(response.status_code) + " - " + response.text)
else:
if log:
logging.info('response from website:\n\n%s\n', response.text)
return response.text
def shortname(name):
sname = []
name = name.replace('(','').replace(')','')
if ' ' in name:
for i in name.split(' '):
sname.append(i[0])
elif '-' in name:
for i in name.split('-'):
sname.append(i[0])
else:
sname.append(name[:3])
return ''.join(sname)
def output_parser(device, output, xpath, timestamp):
data = []
dict = {}
root = ET.fromstring(output)
for i in xpath:
for entry in root.findall(i):
dict['dpid'] = entry.tag
for child in entry:
dict[shortname(child.tag)] = float(child.text)
dict['timestamp'] = timestamp
dict['device'] = device
data.append(dict)
dict = {}
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect session distribution on .',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'session-distribution'
data_collector = {
'session-distribution': {
'cmd': '<show><session><distribution><statistics></statistics></distribution></session></show>',
'xpath': ['./result/']
}
}
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
if args.log:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
for key in data_collector:
cmd = data_collector[key]['cmd']
xpath = data_collector[key]['xpath']
params = {
'type': 'op',
'key': args.pwd,
'cmd': cmd
}
xml_response: str = get_api(device, params, args.log)
if key == 'session-distribution':
data = output_parser(device, xml_response, xpath, datenow)
with open('D:\\User data\\admin\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
import argparse
import requests
import xml.etree.ElementTree as ET
import logging
import sys
import json
import datetime
import re
import socket
def get_api(device, params, log=False):
# script_path = os.path.dirname(__file__)
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
# CA_path = os.path.join(script_path, CA_file)
request_timeout = 20
url: str = 'https://' + device + '/api/'
if log:
logging.info('Trying to access device: %s with cmd: %s', device, params['cmd'])
try:
# response = requests.post(url, timeout=request_timeout, verify=CA_path)
response = requests.post(url, params=params, timeout=request_timeout, verify=False)
except requests.exceptions.RequestException as e:
if log:
logging.error('We run in that problem: %s', e)
raise SystemExit(e)
if response.status_code != 200:
if log:
logging.error('with the url: %s', response.url)
logging.error('Cannot access the website, status code: %s', response.status_code)
logging.error('reply from the website:\n%s\n', response.text)
raise SystemExit(str(response.status_code) + " - " + response.text)
else:
if log:
logging.info('response from website:\n\n%s\n', response.text)
return response.text
def shortname(name):
sname = []
name = name.replace('(','').replace(')','')
if ' ' in name:
for i in name.split(' '):
sname.append(i[0])
elif '-' in name:
for i in name.split('-'):
sname.append(i[0])
else:
sname.append(name[:3])
return ''.join(sname)
def output_parser(device, output, xpath, timestamp):
data = []
dict = {}
root = ET.fromstring(output)
for i in xpath:
for entry in root.findall(i):
for child in entry:
if child.tag in ['vsys']:
dict[child.tag] = child.text
elif child.text is not None:
dict[shortname(child.tag)] = int(child.text)
dict['timestamp'] = timestamp
dict['device'] = device
data.append(dict)
dict = {}
return data
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect session count per vsys.',epilog="And that's how you use rest api...")
parser.add_argument('-l', '--log',
action = argparse.BooleanOptionalAction,
default = False,
dest = 'log',
help = 'switch logging on or off')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument('-d', '--device',
dest = 'device',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password hash for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
datenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = 'session-meter'
data_collector = {
'session-meter': {
'cmd': '<show><session><meter></meter></session></show>',
'xpath': ['./result/']
}
}
if host_to_dns(args.device) == "not resolvable":
exit
else:
device = host_to_dns(args.device)
if args.log:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = 'D:\\User data\\admin\\dev\\' + device + '_' + key + '_' + timestamp + '_global.log'
logging.basicConfig(filename=filename, filemode='w',
format='%(asctime)s - %(process)d - %(levelname)s - %(message)s', level=logging.INFO)
for key in data_collector:
cmd = data_collector[key]['cmd']
xpath = data_collector[key]['xpath']
params = {
'type': 'op',
'key': args.pwd,
'cmd': cmd
}
xml_response: str = get_api(device, params, args.log)
if key == 'session-meter':
data = output_parser(device, xml_response, xpath, datenow)
with open('D:\\User data\\dkx8blr\\dev\\' + device + '_' + key + '.json', 'a') as fp:
json.dump(data,fp)
if __name__ == "__main__":
main(sys.argv[1:])
from netmiko import ConnectHandler
import sys
def main(argv):
host = '10.33.245.211'
username = '....'
pwd = '....'
paloalto = {
'device_type': 'paloalto_panos',
'ip': host,
'username': username,
'password': pwd,
'session_log': 'C:\\Users\\admin\\Downloads\\output.txt',
}
ssh = ConnectHandler(**paloalto)
cmd1 = 'set cli pager off'
output1 = ssh.send_command(cmd1)
cmd2 = 'show system info'
output2 = ssh.send_command(cmd2)
print(output2)
if __name__ == "__main__":
main(sys.argv[1:])
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Author: Ist wurst...
# get key first:
# https://pa-panorama/api/?type=keygen&user=de-netmgmt&password=<ASKFORIT>
#
# https://IPADDRESS/api/?type=config&action=set&xpath=/config/shared/reports/entry[@name='TrafSumCustTime']&element=<type><trsum><sortby>bytes</sortby><group-by>src</group-by><aggregate-by><member>from</member><member>app</member></aggregate-by><values><member>bytes</member><member>bytes_sent</member><member>bytes_received</member></values></trsum></type><topn>10</topn><topm>5</topm><caption>TrafSumCustTime</caption><start-time>DATE+TIMESTAMP</start-time><end-time>DATE+TIMESTAMP</end-time>&key=KEY
import requests
import argparse
import json
import logging
import ssl
import sys, getopt
import time
import xml.etree.ElementTree as ET
import datetime
import pdb
def get_api(host, params, log=False):
# script_path = os.path.dirname(__file__)
# CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
# CA_path = os.path.join(script_path, CA_file)
request_timeout = 20
url: str = 'https://' + host + '/api/'
if log:
logging.info('Trying to access host: %s with cmd: %s', host, params['cmd'])
try:
# response = requests.post(url, timeout=request_timeout, verify=CA_path)
response = requests.post(url, params=params, timeout=request_timeout, verify=False)
except requests.exceptions.RequestException as e:
if log:
logging.error('We run in that problem: %s', e)
raise SystemExit(e)
if response.status_code != 200:
if log:
logging.error('Cannot access the website, status code: %s', response.status_code)
logging.error('reply from the website:\n%s\n', response.text)
raise SystemExit(str(response.status_code) + " - " + response.text)
else:
if log:
logging.info('response from website:\n\n%s\n', response.text)
return response.text
def get_report( host, hash, reportname, report, starttime, endtime, log=False ):
#reportstarturl = '/api/?type=report&async=yes&reporttype=custom&reportname='
#reportgeturl = '/api/?type=report&action=get&job-id='
argssetrep = {
'type':'config',
'action':'set',
'xpath':'/config/shared/reports/entry[@name=\''+reportname+'\']',
'element':report,
'key':hash
}
argsinitrep = {
'type':'report',
'async':'yes',
'key':hash,
'reporttype':'custom',
'reportname':reportname
}
argsgetrep = {
'type':'report',
'action':'get',
'job-id':'1',
'key':hash
}
#1. create the report
xml_response = get_api(host, argssetrep, log)
if log:
logging.info("api xml output Original: \n" + xml_response.decode("utf-8") )
#2. start the report
xml_response = get_api(host, argsinitrep, log)
if log:
logging.info("api xml output Original: \n" + xml_response.decode("utf-8") )
if xml_response:
time.sleep(7)
#pdb.set_trace()
root = ET.fromstring(xml_response)
for entry in root.findall('./result/msg/line'):
attribval = entry.text
if attribval.find('jobid') != -1:
jobid = attribval.split()[-1]
argsgetrep['job-id'] = jobid
#3. loop till the report is ready. The status will be change from ACT to FIN.
repstatus = True
while repstatus:
#pdb.set_trace()
xml_response2 = get_api(host, argsgetrep, log)
xmlrep = xml_response2.decode("utf-8")
rootreport = ET.fromstring(xmlrep)
xpath = './result/job/status'
if rootreport.find(xpath).text == "FIN":
repstatus = False
output = ''
for entry2 in rootreport.findall('./result/report/entry'):
#ET.dump(entry2)
dateelement = ET.Element("date")
dateelement.text = starttime
entry2.insert(0,dateelement)
str = ET.tostring(entry2).decode()
str = str.replace('\r', '').replace('\n', '')
output += str.strip() + "\n"
else:
time.sleep(10)
else:
response = "I cannot find the jobid for the report. Contact the Firewall Administrator!"
sys.exit()
return output
else:
raise Exception("The root cause of the problem is there is no jobid or something...")
def main(argv):
#1. check the arguments
today = datetime.date.today()
todaystr = today.strftime('%Y/%m/%d') + ' 00:05:00'
yesterday = today - datetime.timedelta(days = 1)
yesterdaystr = yesterday.strftime('%Y/%m/%d') + ' 00:05:00'
parser = argparse.ArgumentParser(description='Palo Alto Report Collector .',epilog="And that's how you collect custom reports to ELK...")
parser.add_argument('-v','--version',
action = 'version', version='%(prog)s 1.0')
parser.add_argument('-l','--log',
action='store_true',
default=False,
help='switch logging on',
dest='log')
parser.add_argument('-p','--panorama',
help='IP or hostname of the Palo Alto Panorama',
dest='panorama')
parser.add_argument('-k','--hashkey',
help='Password hash for the logon on Palo Alto Panorama',
dest='hashkey')
parser.add_argument('-n','--reportname',
help='The Name of the report in Panorama',
dest='reportname')
parser.add_argument('-r','--report',
help='palo alto xml report',
dest='report')
parser.add_argument('-s','--starttime',
nargs='?',
help='start time for the report time selection. default is yesterday.',
dest='starttime',
default=yesterdaystr)
parser.add_argument('-e','--endtime',
nargs='?',
help='end time for the report time selection. default is today.',
dest='endtime',
default=todaystr)
args = parser.parse_args()
# logging
if args.log:
logging.basicConfig(level=logging.INFO,
filename='prtg-info.log', # log to this file
format='%(asctime)s %(message)s') # include timestamp
logging.info("Start Logging...")
ipnet = '10.34.160.0/19'
report_query = '(addr.dst in \'{ipnetwork}\') and (action eq allow)'.format(ipnetwork=ipnet)
report = '''\
<type>
<panorama-trsum>
<sortby>bytes</sortby>
<aggregate-by>
<member>app</member>
<member>category-of-app</member>
<member>subcategory-of-app</member>
<member>technology-of-app</member>
<member>container-of-app</member>
</aggregate-by>
<values>
<member>bytes_received</member>
<member>bytes_sent</member>
<member>bytes</member>
<member>sessions</member>
</values>
</panorama-trsum>
</type>
<period>last-7-days</period>
<topn>5</topn>
<topm>10</topm>
<caption>net-report</caption>
<query>{rep_query}</query>\
'''.format(rep_query=report_query)
report.replace('\n','')
rpt = get_report(args.panorama, args.hashkey, args.reportname, report, args.starttime, args.endtime, args.log)
file = open(args.reportname,"w")
file.write(rpt)
file.close()
if __name__ == "__main__":
main(sys.argv[1:])
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import os
import requests
from requests.utils import requote_uri
import json
import argparse
import datetime
import time
import pdb
import xml.etree.ElementTree as ET
import re
from netmiko import ConnectHandler
import socket
def get_cmd(host, username, pwd, cmd, log=False):
paloalto = {
'device_type': 'paloalto_panos',
'ip': host,
'username': username,
'password': pwd,
'session_log': 'output.txt',
}
ssh = ConnectHandler(**paloalto)
output = ssh.send_command(cmd)
return output
def json_stream(output, elasticindex, device):
timestamp = datetime.datetime.now()
issuedate = timestamp.strftime('%Y-%m-%d %H:%M:%S')
tsa_dict = {}
list = '\n'
elasticdict = {
'index' : {
'_index' : elasticindex,
'_type' : '_doc'
}
}
for line in output.splitlines():
if len(line.strip()) > 4 and "---" not in line and "Blocks" not in line:
values = line.split()
tsa_dict["issuedate"] = issuedate
tsa_dict["device"] = device
tsa_dict["servername"] = values[0]
tsa_dict["hostname"] = values[1]
tsa_dict["serverport"] = values[2]
tsa_dict["vsysname"] = values[3]
tsa_dict["serverstatus"] = values[4]
tsa_dict["serverversion"] = values[5]
tsa_dict["users"] = values[6]
tsa_dict["blocks"] = values[6]
# convert to stream of jsons with new line separation
list += json.dumps(elasticdict) + "\n"
list += json.dumps(tsa_dict) + "\n"
tsa_dict = {}
return list
def uploader(url, jsdata, username, password, log=False):
headers = {'Content-Type': 'application/json'}
#script_path = os.path.dirname(__file__)
#CA_file = "Palo_Alto_Networks_Inc-Root-CA_G1.pem"
#CA_path = os.path.join(script_path, CA_file)
request_timeout = 60
#pdb.set_trace()
if log:
logging.info("URL: " + url)
try:
if type(jsdata) == dict:
#response = requests.post(url, json=jsdata, headers=headers, timeout=request_timeout)
response = requests.post(url, json=jsdata, headers=headers, auth=(username,password), timeout=request_timeout)
else:
#response = requests.post(url, data=jsdata, headers=headers, timeout=request_timeout)
response = requests.post(url, data=jsdata, headers=headers, auth=(username,password), timeout=request_timeout)
except requests.exceptions.RequestException as e:
raise SystemExit(e)
if response.status_code != 200:
raise SystemExit(str(response.status_code) + " - " + response.text)
return str(response.status_code) + " - " + response.text
def validate_ip_address(address):
match = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", address)
if bool(match) is False:
#print("IP address {} is not valid".format(address))
return False
for part in address.split("."):
if int(part) < 0 or int(part) > 255:
print("IP address {} is not valid".format(address))
return False
#print("IP address {} is valid".format(address))
return True
def host_to_dns(device):
if validate_ip_address(device):
try:
dnsname = socket.gethostbyaddr(device)[0]
return dnsname
except socket.error:
return "not resolvable"
else:
try:
socket.gethostbyname(device)
return device
except socket.error:
return "not resolvable"
def main(argv):
parser = argparse.ArgumentParser(description='Collect TSA statistics.',epilog="And that's how you use rest api...")
parser.add_argument('--log',
action = 'store_true',
default = False,
dest = 'log_switch',
help = 'switch logging on')
parser.add_argument('-V','--version',
action = 'version',
version = '%(prog)s 1.0')
parser.add_argument("-v", "--verbosity",
action = "count",
default = 0,
help = 'increase output verbosity',
required = False)
parser.add_argument('-d', '--device',
dest = 'host',
help = 'IP or hostname of the Palo Alto Firewall',
required = True)
parser.add_argument('-u', '--username',
dest = 'username',
help = 'username for the logon on Palo Alto Firewall.',
required = True)
parser.add_argument('-p', '--pwd',
dest = 'pwd',
help = 'Password for the logon on Palo Alto Firewall.',
required = True)
args = parser.parse_args()
command = 'show user ts-agent statistics'
elastic_url = "https://elasticsearch-test:443"
elastic_index = "tsa-statistics"
username = '...'
password = '...'
#elastic_url = "http://10.251.58.65:9200"
#elastic_index = "tsa-agent-statistics"
#username = '...'
#password = '...'
# logging
if args.log_switch:
logging.basicConfig(level=logging.INFO,
filename='paloalto_tapstate_check.log', # log to this file
format='%(asctime)s %(message)s') # include timestamp
logging.info("Start Logging...")
#2. check if file woth host list or a simple host
if os.path.isfile(args.host):
hostlist = open(args.host, "r")
for host in hostlist:
host = host.strip()
if host_to_dns(host) == "not resolvable":
exit
else:
host = host_to_dns(host)
xml_response = get_cmd(host, args.username, args.pwd, command, args.log_switch)
if xml_response:
#upload the docs to elastic
jsdata = json_stream(xml_response, elastic_index, host)
url = elastic_url + "/_bulk?pretty"
uploader(url, jsdata, username, password, log=False)
else:
if host_to_dns(args.host) == "not resolvable":
exit
else:
host = host_to_dns(args.host)
xml_response = get_cmd(host, args.username, args.pwd, command, args.log_switch)
if xml_response:
#upload the docs to neptune
jsdata = json_stream(xml_response, elastic_index, host)
filename = "jsdata" + str(args.host) + "txt"
change_file1 = open(filename, "w")
change_file1.write(jsdata)
change_file1.close()
#with open("jsdata.txt", "r") as js_stream:
# jsdata = js_stream.read()
#pdb.set_trace()
url = elastic_url + "/_bulk?pretty"
uploader(url, jsdata, username, password, log=False)
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment