Skip to content

Instantly share code, notes, and snippets.

@jitsejan
Created May 8, 2018 16:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jitsejan/7bc80f95b97dfbf32bbd178973fcb8eb to your computer and use it in GitHub Desktop.
Save jitsejan/7bc80f95b97dfbf32bbd178973fcb8eb to your computer and use it in GitHub Desktop.
Splunk dependency checker
""" main.py """
from argparse import ArgumentParser
from lxml import etree as ET
import getpass
import glob
import os
import re
import requests
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class DependencyChecker(object):
""" Defines the DependencyChecker class """
def __init__(self, username, password, directory):
""" Initialize the DependencyChecker """
self.username = username
self.password = password
self.directory = directory
self.dependencies = []
self.indexes = []
self.searchcommands = self._get_commands()
def _get_commands(self):
""" Retrieves the search commands from IMAP """
command_map = {}
url = "https://mysplunkurl:8089/services/data/commands"
params = {'output_mode' : 'json', 'count' : -1}
response = requests.get(url, params, auth=HTTPBasicAuth(self.username, self.password), verify=False).json()
try:
for elem in response['entry']:
command_map[elem['name'].lower()] = elem['acl']['app'].lower()
return command_map
except:
raise Exception("Error communicating with the REST API. Please verify credentials.")
def _get_script_dependencies(self, tree):
try:
return [script.split(":")[0].strip() for script in tree.getroot().get('script').split(',') if len(script.split(":")) > 1]
except:
return []
def _get_stylesheet_dependencies(self, tree):
try:
return [stylesheet.split(":")[0].strip() for stylesheet in tree.getroot().get('stylesheet').split(',') if len(stylesheet.split(":")) > 1]
except:
return []
def _get_visualization_dependencies(self, tree):
return [el.get('type').split('.')[0] for el in tree.xpath('//viz')]
def _get_app_dependencies(self, tree):
filecontent = ET.tostring(tree)
return re.findall(r'\/app\/(.*?)\/', str(filecontent))
def _get_query_dependencies(self, tree):
query_dependencies = []
try:
for elem in tree.xpath('//query'):
for key in self.searchcommands:
if key in elem.text.lower() and self.searchcommands[key] not in ['search'] and self.searchcommands[key] not in query_dependencies:
query_dependencies.append(self.searchcommands[key])
return query_dependencies
except:
return []
def get_indexes(self):
""" Get indexes from savedsearche """
for file in glob.glob(self.directory+'/**/savedsearches.conf', recursive=True):
with open(file) as f:
for elem in re.findall('index=(.*?)\s', f.read()):
if elem not in self.indexes:
self.indexes.append(elem)
def get_savedsearch_dependencies(self):
""" Get savedsearch dependencies """
for file in glob.glob(self.directory+'/**/savedsearches.conf', recursive=True):
with open(file) as f:
for elem in re.findall('search(.*?)\n', f.read()):
for key in self.searchcommands:
if key in elem.lower() and self.searchcommands[key] not in ['search'] and self.searchcommands[key] not in self.dependencies:
self.dependencies.append(self.searchcommands[key])
def get_dashboard_dependencies(self):
""" Get dependencies based on the XML files """
for xmlfile in glob.glob('{}/**/*.xml'.format(self.directory), recursive=True):
tree = ET.parse(xmlfile)
for elem in self._get_script_dependencies(tree):
self.dependencies.append(elem)
for elem in self._get_stylesheet_dependencies(tree):
self.dependencies.append(elem)
for elem in self._get_visualization_dependencies(tree):
self.dependencies.append(elem)
for elem in self._get_app_dependencies(tree):
self.dependencies.append(elem)
for elem in self._get_query_dependencies(tree):
self.dependencies.append(elem)
def print_instructions(self):
""" Print instruction for the volumes and indexes """
print("-"*80)
print("Please make sure the following exists in your docker-compose.yml:")
print("-"*80)
print("volumes:")
for dep in list(set(self.dependencies)):
print("- ../%s:/opt/splunk/etc/apps/%s" % (dep, dep))
print("-"*80)
if len(self.indexes) > 1:
print("Please make sure the following indexes are available:")
print("-"*80)
for ind in self.indexes:
print("- "+ind)
def validate_dependencies(self):
""" Validate if the dependencies are checked out in the parent dir """
if len(self.dependencies) > 1:
print("-"*80)
print("Please clone Git repository for:")
for dep in list(self.dependencies):
if not os.path.exists(os.path.join(os.path.dirname(os.path.abspath(self.directory)), dep)):
print("- ", dep)
def main():
""" The main function """
parser = ArgumentParser()
parser.add_argument("-u",
"--username",
dest="username",
default=None)
parser.add_argument("-p",
"--password",
dest="password",
default=None)
parser.add_argument("-a",
"--app-directory",
dest="app_directory",
default=".")
args = parser.parse_args()
username = args.username
password = args.password
directory = args.app_directory
if username is None:
username = input("Username: ")
if args.password is None:
password = getpass.getpass()
dep_check = DependencyChecker(username, password, directory)
dep_check.get_dashboard_dependencies()
dep_check.get_savedsearch_dependencies()
dep_check.get_indexes()
dep_check.print_instructions()
dep_check.validate_dependencies()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment