Created
May 8, 2018 16:13
-
-
Save jitsejan/7bc80f95b97dfbf32bbd178973fcb8eb to your computer and use it in GitHub Desktop.
Splunk dependency checker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" main.py """ | |
from argparse import ArgumentParser | |
from lxml import etree as ET | |
import getpass | |
import glob | |
import os | |
import re | |
import requests | |
from requests.auth import HTTPBasicAuth | |
from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) | |
class DependencyChecker(object): | |
""" Defines the DependencyChecker class """ | |
def __init__(self, username, password, directory): | |
""" Initialize the DependencyChecker """ | |
self.username = username | |
self.password = password | |
self.directory = directory | |
self.dependencies = [] | |
self.indexes = [] | |
self.searchcommands = self._get_commands() | |
def _get_commands(self): | |
""" Retrieves the search commands from IMAP """ | |
command_map = {} | |
url = "https://mysplunkurl:8089/services/data/commands" | |
params = {'output_mode' : 'json', 'count' : -1} | |
response = requests.get(url, params, auth=HTTPBasicAuth(self.username, self.password), verify=False).json() | |
try: | |
for elem in response['entry']: | |
command_map[elem['name'].lower()] = elem['acl']['app'].lower() | |
return command_map | |
except: | |
raise Exception("Error communicating with the REST API. Please verify credentials.") | |
def _get_script_dependencies(self, tree): | |
try: | |
return [script.split(":")[0].strip() for script in tree.getroot().get('script').split(',') if len(script.split(":")) > 1] | |
except: | |
return [] | |
def _get_stylesheet_dependencies(self, tree): | |
try: | |
return [stylesheet.split(":")[0].strip() for stylesheet in tree.getroot().get('stylesheet').split(',') if len(stylesheet.split(":")) > 1] | |
except: | |
return [] | |
def _get_visualization_dependencies(self, tree): | |
return [el.get('type').split('.')[0] for el in tree.xpath('//viz')] | |
def _get_app_dependencies(self, tree): | |
filecontent = ET.tostring(tree) | |
return re.findall(r'\/app\/(.*?)\/', str(filecontent)) | |
def _get_query_dependencies(self, tree): | |
query_dependencies = [] | |
try: | |
for elem in tree.xpath('//query'): | |
for key in self.searchcommands: | |
if key in elem.text.lower() and self.searchcommands[key] not in ['search'] and self.searchcommands[key] not in query_dependencies: | |
query_dependencies.append(self.searchcommands[key]) | |
return query_dependencies | |
except: | |
return [] | |
def get_indexes(self): | |
""" Get indexes from savedsearche """ | |
for file in glob.glob(self.directory+'/**/savedsearches.conf', recursive=True): | |
with open(file) as f: | |
for elem in re.findall('index=(.*?)\s', f.read()): | |
if elem not in self.indexes: | |
self.indexes.append(elem) | |
def get_savedsearch_dependencies(self): | |
""" Get savedsearch dependencies """ | |
for file in glob.glob(self.directory+'/**/savedsearches.conf', recursive=True): | |
with open(file) as f: | |
for elem in re.findall('search(.*?)\n', f.read()): | |
for key in self.searchcommands: | |
if key in elem.lower() and self.searchcommands[key] not in ['search'] and self.searchcommands[key] not in self.dependencies: | |
self.dependencies.append(self.searchcommands[key]) | |
def get_dashboard_dependencies(self): | |
""" Get dependencies based on the XML files """ | |
for xmlfile in glob.glob('{}/**/*.xml'.format(self.directory), recursive=True): | |
tree = ET.parse(xmlfile) | |
for elem in self._get_script_dependencies(tree): | |
self.dependencies.append(elem) | |
for elem in self._get_stylesheet_dependencies(tree): | |
self.dependencies.append(elem) | |
for elem in self._get_visualization_dependencies(tree): | |
self.dependencies.append(elem) | |
for elem in self._get_app_dependencies(tree): | |
self.dependencies.append(elem) | |
for elem in self._get_query_dependencies(tree): | |
self.dependencies.append(elem) | |
def print_instructions(self): | |
""" Print instruction for the volumes and indexes """ | |
print("-"*80) | |
print("Please make sure the following exists in your docker-compose.yml:") | |
print("-"*80) | |
print("volumes:") | |
for dep in list(set(self.dependencies)): | |
print("- ../%s:/opt/splunk/etc/apps/%s" % (dep, dep)) | |
print("-"*80) | |
if len(self.indexes) > 1: | |
print("Please make sure the following indexes are available:") | |
print("-"*80) | |
for ind in self.indexes: | |
print("- "+ind) | |
def validate_dependencies(self): | |
""" Validate if the dependencies are checked out in the parent dir """ | |
if len(self.dependencies) > 1: | |
print("-"*80) | |
print("Please clone Git repository for:") | |
for dep in list(self.dependencies): | |
if not os.path.exists(os.path.join(os.path.dirname(os.path.abspath(self.directory)), dep)): | |
print("- ", dep) | |
def main(): | |
""" The main function """ | |
parser = ArgumentParser() | |
parser.add_argument("-u", | |
"--username", | |
dest="username", | |
default=None) | |
parser.add_argument("-p", | |
"--password", | |
dest="password", | |
default=None) | |
parser.add_argument("-a", | |
"--app-directory", | |
dest="app_directory", | |
default=".") | |
args = parser.parse_args() | |
username = args.username | |
password = args.password | |
directory = args.app_directory | |
if username is None: | |
username = input("Username: ") | |
if args.password is None: | |
password = getpass.getpass() | |
dep_check = DependencyChecker(username, password, directory) | |
dep_check.get_dashboard_dependencies() | |
dep_check.get_savedsearch_dependencies() | |
dep_check.get_indexes() | |
dep_check.print_instructions() | |
dep_check.validate_dependencies() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment