Skip to content

Instantly share code, notes, and snippets.

@ezeeetm
Created December 22, 2017 00:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ezeeetm/e1b91a6c87f03f18f6c44fa5b4c4fc4c to your computer and use it in GitHub Desktop.
Save ezeeetm/e1b91a6c87f03f18f6c44fa5b4c4fc4c to your computer and use it in GitHub Desktop.
Automating Vulnerability Management on AWS with TripWire IP360
#!/usr/bin/env python
import xmlrpclib
import ssl
import logging
import os
import sys
import time
logging.basicConfig(level=logging.DEBUG)
ip360_endpoint = 'https://your.tripwire.api_endpoint.ip_address/api2xmlrpc/'
ip360_username = 'username'
ip360_pw = 'password'
environments = {
'NONPROD': {'device_profiler':'DP.2', 'scan_profile':'ScanProfile.7', 'network':'Network.298'},
'PROD': {'device_profiler':'DP.2', 'scan_profile':'ScanProfile.7', 'network':'Network.298'}
}
def get_conn(ip360_endpoint, ip360_username, ip360_pw):
# CWT ip_360 endpoint uses a self-signed SSL cert, which will throw an error in xmlrpclib.
# using 'context=ssl._create_unverified_context()' below to address this
# https://stackoverflow.com/questions/30461969/disable-default-certificate-verification-in-python-2-7-9
server = xmlrpclib.ServerProxy(ip360_endpoint, context=ssl._create_unverified_context())
session = server.login(2, 0, ip360_username, ip360_pw)
resp = server.call(session,'SESSION','getUserObject',{})
logging.info("#######################\nip_360 UserObject: %s\n" % (resp))
return server, session
def get_config(environments):
logging.info("#######################\nusing environments: %s" % (environments))
device_profiler = environments[os.environ['ENVIRONMENT']]['device_profiler']
scan_profile = environments[os.environ['ENVIRONMENT']]['scan_profile']
network = environments[os.environ['ENVIRONMENT']]['network']
logging.info("device_profiler for this scan: %s" % (device_profiler))
logging.info("scan_profile for this scan: %s" % (scan_profile))
logging.info("network for this scan: %s\n" % (network))
return device_profiler, scan_profile, network
def get_device_profilers(server, session):
params={'query':'name LIKE \'%\''}
device_profilers = server.call(session,'class.DP','search',params)
logging.info("#######################\ndevice_profilers: %s\n" % (device_profilers))
return device_profilers
def get_scan_profiles(server, session):
params={'query':'name LIKE \'%\''}
scan_profiles = server.call(session,'class.ScanProfile','search',params)
logging.info("#######################\nscan_profiles: %s\n" % (scan_profiles))
return scan_profiles
def get_networks(server, session):
params={'query':'name LIKE \'%\''}
networks = server.call(session,'class.Network','search',params)
logging.info("#######################\nnetworks: %s\n" % (networks))
return networks
def validate_config(value, lst):
if any(value in x for x in lst):
logging.info("%s validated" % (value))
return
else:
logging.critical("%s is not a valid value for %s" % (value,lst))
sys.exit(1)
def scan_network(server, session, device_profiler,scan_profile,network):
params = {'network':network,'scanProfile':scan_profile}
audit = server.call(session,device_profiler,'startScan',params)
audit_attribs= server.call(session,audit,'getAttributes',{})
logging.info("#######################\naudit_attribs: %s\n" % (audit_attribs))
while audit_attribs['status'] == 1: #1:InProgress 2:Failed 3:Cancelled 4:Finished 5:Paused 6:Auto-Paused 7:Suspended
logging.info("TripWire IP360 scan in progress... ")
time.sleep(15)
audit_attribs= server.call(session,audit,'getAttributes',{})
return audit
def get_report(server, session, audit):
params = {'format':'CSV'}
report = server.call(session,audit,'getReport',params)
logging.info("#######################\nreport: %s\n" % (report))
return report
# authenticate with ip360 API endpoint
server, session = get_conn(ip360_endpoint, ip360_username, ip360_pw)
# get config for this scan
device_profiler, scan_profile, network = get_config(environments)
# get device profilers, scan profiles,and networks to validate config
device_profilers = get_device_profilers(server, session)
scan_profiles = get_scan_profiles(server, session)
networks = get_networks(server, session)
# validate configs
logging.info("#######################")
validate_config(device_profiler, device_profilers)
validate_config(scan_profile, scan_profiles)
validate_config(network, networks)
# execute scan
audit = scan_network(server, session, device_profiler,scan_profile,network)
# generate report
report = get_report(server, session, audit)
# notify on report
# gracefully close connection
logging.info("#######################\nexecution complete,logging out of IP360 endpoint")
server.logout(session)
'''
#some example calls to TripWire IP360 API
#nonprod 172.16.4.0/22
#prod 172.16.8.0/22
DPgetAttribs = server.call(session,'class.DP','getAttributes',{})
print(DPgetAttribs)
DPinstanceAttribs = server.call(session,'DP.2','getAttributes',{})
print(DPinstanceAttribs)
DPinstanceAttribValue = server.call(session,'DP.2','getAttributes',{})
print(DPinstanceAttribValue['softwareVersion'])
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment