Skip to content

Instantly share code, notes, and snippets.

@LukeMurphey
Last active February 1, 2023 22:56
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save LukeMurphey/5390638 to your computer and use it in GitHub Desktop.
Save LukeMurphey/5390638 to your computer and use it in GitHub Desktop.
This is basic Splunk REST handler that can be used to make custom endpoints in Splunk. Tags: #splunk
import splunk.admin as admin
import splunk.entity as entity
import splunk
import logging
import logging.handlers
import os
import re
import copy
class StandardFieldValidator(object):
"""
This is the base class that should be used to for field validators.
"""
def to_python(self, name, value):
"""
Convert the field to a Python object. Should throw a ArgValidationException if the data is invalid.
Arguments:
name -- The name of the object, used for error messages
value -- The value to convert
"""
if len( str(value).strip() ) == 0:
raise admin.ArgValidationException("The value for the '%s' parameter cannot be empty" % (name))
return value
def to_string(self, name, value):
"""
Convert the field to a string that can be persisted to a conf file. Should throw a ArgValidationException if the data is invalid.
Arguments:
name -- The name of the object, used for error messages
value -- The value to convert
"""
if value is None:
return ""
else:
return str(value)
class BooleanFieldValidator(StandardFieldValidator):
"""
Validates and converts fields that represent booleans.
"""
def to_python(self, name, value):
if value in [True, False]:
return value
elif str(value).strip().lower() in ["true", "1"]:
return True
elif str(value).strip().lower() in ["false", "0"]:
return False
raise admin.ArgValidationException("The value of '%s' for the '%s' parameter is not a valid boolean" % ( str(value), name))
def to_string(self, name, value):
if value == True:
return "1"
elif value == False:
return "0"
return super(BooleanFieldValidator, self).to_string(name, value)
class IntegerFieldValidator(StandardFieldValidator):
"""
Validates and converts fields that represent integers.
"""
def __init__(self, min_value=None, max_value=None):
self.min_value = min_value
self.max_value = max_value
def to_python(self, name, value):
if value is None:
return None
int_value = int( str(value).strip() )
# Make sure that the value is at least the minimum
if self.min_value is not None and int_value < self.min_value:
raise admin.ArgValidationException("The value of '%s' for the '%s' parameter is not valid, it must be at least %s" % ( str(value), name, self.min_value))
# Make sure that the value is no greater than the maximum
if self.max_value is not None and int_value > self.max_value:
raise admin.ArgValidationException("The value of '%s' for the '%s' parameter is not valid, it must be not be greater than %s" % ( str(value), name, self.max_value))
try:
return int( str(value).strip() )
except ValueError:
raise admin.ArgValidationException("The value of '%s' for the '%s' parameter is not a valid integer" % ( str(value), name))
def to_string(self, name, value):
if value is None or len(str(value).strip()) == 0:
return None
else:
return super(IntegerFieldValidator, self).to_string(name, value)
class FieldSetValidator():
"""
This base class is for validating sets of fields.
"""
def validate(self, name, values):
"""
Validate the values. Should throw a ArgValidationException if the data is invalid.
Arguments:
name -- The name of the object, used for error messages
values -- The value to convert (in a dictionary)
"""
pass
class ListValidator(StandardFieldValidator):
"""
Validates and converts field that represents a list (comma or colon separated).
"""
LIST_SPLIT = re.compile("[:,]*")
def to_python(self, name, value):
# Treat none as an empty list
if value is None:
return []
split_list = ListValidator.LIST_SPLIT.split(value)
return split_list
def to_string(self, name, value):
if value is None:
return ""
else:
# Rebuild the list as comma separated list in order to normalize it
return ",".join( value )
def log_function_invocation(fx):
"""
This decorator will provide a log message for when a function starts and stops.
Arguments:
fx -- The function to log the starting and stopping of
"""
def wrapper(self, *args, **kwargs):
logger.debug( "Entering: " + fx.__name__ )
r = fx(self, *args, **kwargs)
logger.debug( "Exited: " + fx.__name__ )
return r
return wrapper
def setup_logger(level, name, file_name, use_rotating_handler=True):
"""
Setup a logger for the REST handler.
Arguments:
level -- The logging level to use
name -- The name of the logger to use
file_name -- The file name to log to
use_rotating_handler -- Indicates whether a rotating file handler ought to be used
"""
logger = logging.getLogger(name)
logger.propagate = False # Prevent the log messages from being duplicated in the python.log file
logger.setLevel(level)
log_file_path = os.path.join( os.environ['SPLUNK_HOME'], 'var', 'log', 'splunk', file_name)
if use_rotating_handler:
file_handler = logging.handlers.RotatingFileHandler(log_file_path, maxBytes=25000000, backupCount=5)
else:
file_handler = logging.FileHandler(log_file_path)
formatter = logging.Formatter('%(asctime)s %(levelname)s ' + name + ' - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
# Setup the handler
logger = setup_logger(logging.INFO, "RestHandler", "rest_handler.log")# CUSTOMIZE_LOGGING_INFO_HERE
class RestHandler(admin.MConfigHandler):
"""
The REST handler provides <ADD_DESCRIPTION_HERE>...
"""
# Below is the name of the conf file
CONF_FILE = 'CUSTOMIZE_CONF_FILE_HERE'
# Below are the list of parameters that are accepted
PARAM_DEBUG = 'debug'
# Below are the list of valid and required parameters
VALID_PARAMS = [ PARAM_DEBUG ]
REQUIRED_PARAMS = []
# These are parameters that are not persisted to the conf files; these are used within the REST handler only
UNSAVED_PARAMS = [ ]
# List of fields and how they will be validated
# Note: if a field does not have a validator, it will be passed through without validation
FIELD_VALIDATORS = {
PARAM_DEBUG : BooleanFieldValidator()
}
# This field designates the fields that the REST handler ought to allow fields with similar values using dot syntax (value.1, value.2, etc).
# For these fields, instances containing what looks like the dot syntax will use the validator based on the item without the dot syntax.
# Thus, the field "value.1.name" will be validated by whatever item validates "value.name".
MULTI_FIELDS = [ ]
MULTI_FIELD_RE = re.compile("(?P<prefix>.*)[.][0-9]+(?P<suffix>.*)")
# These are validators that work across several fields and need to occur on the cleaned set of fields
GENERAL_VALIDATORS = [ ]
# General variables
APP_NAME = "CUSTOMIZE_APP_NAME_HERE"
def setup(self):
"""
Setup the required and optional arguments
"""
if self.requestedAction == admin.ACTION_EDIT or self.requestedAction == admin.ACTION_CREATE:
# Set the required parameters
for arg in self.REQUIRED_PARAMS:
self.supportedArgs.addReqArg(arg)
# Set up the valid parameters
for arg in self.VALID_PARAMS:
if arg not in self.REQUIRED_PARAMS:
self.supportedArgs.addOptArg(arg)
@classmethod
def removeMultiFieldSpecifier(cls, name):
"""
Remove the multi-field specifier if the field is supposed to be support mulitple instances using the dot syntax (value.1, value.2, etc).
Arguments:
name -- The name of the field.
"""
# Stop if we don't have any multi-fields
if cls.MULTI_FIELDS is None:
return name
m = cls.MULTI_FIELD_RE.match(name)
if m and m.groups()[0] in cls.MULTI_FIELDS:
logger.debug("removeMultiFieldSpecifier: " + name + " to " + m.groups()[0] + m.groups()[1])
return m.groups()[0] + m.groups()[1]
else:
return name
@classmethod
def convertParams(cls, name, params, to_string=False):
"""
Convert so that they can be saved to the conf files and validate the parameters.
Arguments:
name -- The name of the stanza being processed (used for exception messages)
params -- The dictionary containing the parameter values
to_string -- If true, a dictionary containing strings is returned; otherwise, the objects are converted to the Python equivalents
"""
new_params = {}
for key, value in params.items():
validator = cls.FIELD_VALIDATORS.get( cls.removeMultiFieldSpecifier(key) )
if validator is not None:
if to_string:
new_params[key] = validator.to_string(key, value)
else:
new_params[key] = validator.to_python(key, value)
else:
new_params[key] = value
return new_params
@log_function_invocation
def handleList(self, confInfo):
"""
Provide the list of configuration options.
Arguments
confInfo -- The object containing the information about what is being requested.
"""
# Read the current settings from the conf file
confDict = self.readConf(self.CONF_FILE)
# Set the settings
if None != confDict:
for stanza, settings in confDict.items():
# DEFINE DEFAULT PARAMETERS HERE
for key, val in settings.items():
confInfo[stanza].append(key, val)
# ADD CODE HERE to get your parameters
@log_function_invocation
def handleReload(self, confInfo):
"""
Reload the list of configuration options.
Arguments
confInfo -- The object containing the information about what is being requested.
"""
# Refresh the configuration (handles disk based updates)
entity.refreshEntities('properties/' + self.CONF_FILE, sessionKey=self.getSessionKey())
def clearValue(self, d, name):
"""
Set the value of in the dictionary to none
Arguments:
d -- The dictionary to modify
name -- The name of the variable to clear (set to none)
"""
if name in d:
d[name] = None
@log_function_invocation
def handleEdit(self, confInfo):
"""
Handles edits to the configuration options
Arguments
confInfo -- The object containing the information about what is being requested.
"""
try:
name = self.callerArgs.id
args = self.callerArgs
# Load the existing configuration
confDict = self.readConf(self.CONF_FILE)
# Get the settings for the given stanza
is_found = False
if name is not None:
for stanza, settings in confDict.items():
if stanza == name:
is_found = True
existing_settings = copy.copy(settings) # In case, we need to view the old settings
break # Got the settings object we were looking for
# Stop if we could not find the name
if not is_found:
raise admin.NotFoundException("A stanza for the given name '%s' could not be found" % (name) )
# Get the settings that are being set
new_settings = {}
for key in args.data:
new_settings[key] = args[key][0]
# Create the resulting configuration that would be persisted if the settings provided are applied
settings.update( new_settings )
# Check the configuration settings
cleaned_params = self.checkConf(new_settings, name, confInfo, existing_settings=existing_settings)
# Get the validated parameters
validated_params = self.convertParams( name, cleaned_params, True )
# Clear out the given parameters if blank so that it can be removed if the user wishes (note that values of none are ignored by Splunk)
clearable_params = [ ]
for p in clearable_params:
if p in validated_params and validated_params[p] is None:
validated_params[p] = ""
# Write out the updated conf
self.writeConf(self.CONF_FILE, name, validated_params )
except admin.NotFoundException, e:
raise e
except Exception, e:
logger.exception("Exception generated while performing edit")
raise e
@classmethod
def checkConf(cls, settings, stanza=None, confInfo=None, onlyCheckProvidedFields=False, existing_settings=None):
"""
Checks the settings and raises an exception if the configuration is invalid.
Arguments:
settings -- The settings dictionary the represents the configuration to be checked
stanza -- The name of the stanza being checked
confInfo -- The confinfo object that was received into the REST handler
onlyCheckProvidedFields -- Indicates if we ought to assume that this is only part of the fields and thus should not alert if some necessary fields are missing
existing_settings -- The existing settings before the current changes are applied
"""
# Add all of the configuration items to the confInfo object so that the REST endpoint lists them (even if they are wrong)
# We want them all to be listed so that the users can see what the current value is (and hopefully will notice that it is wrong)
for key, val in settings.items():
# Add the value to the configuration info
if stanza is not None and confInfo is not None:
# Handle the EAI:ACLs differently than the normal values
if key == 'eai:acl':
confInfo[stanza].setMetadata(key, val)
elif key in cls.VALID_PARAMS and key not in cls.UNSAVED_PARAMS:
confInfo[stanza].append(key, val)
# Below is a list of the required fields. The entries in this list will be removed as they
# are observed. An empty list at the end of the config check indicates that all necessary
# fields where provided.
required_fields = cls.REQUIRED_PARAMS[:]
# Check each of the settings
for key, val in settings.items():
# Remove the field from the list of required fields
try:
required_fields.remove(key)
except ValueError:
pass # Field not available, probably because it is not required
# Stop if not all of the required parameters are not provided
if onlyCheckProvidedFields == False and len(required_fields) > 0: #stanza != "default" and
raise admin.ArgValidationException("The following fields must be defined in the configuration but were not: " + ",".join(required_fields) )
# Clean up and validate the parameters
cleaned_params = cls.convertParams(stanza, settings, False)
# Run the general validators
for validator in cls.GENERAL_VALIDATORS:
validator.validate( stanza, cleaned_params, existing_settings )
# Remove the parameters that are not intended to be saved
for to_remove in cls.UNSAVED_PARAMS:
if to_remove in cleaned_params:
del cleaned_params[to_remove]
# Return the cleaned parameters
return cleaned_params
@staticmethod
def stringToIntegerOrDefault( str_value, default_value=None ):
"""
Converts the given string to an integer or returns none if it is not a valid integer.
Arguments:
str_value -- A string value of the integer to be converted.
default_value -- The value to be used if the string is not an integer.
"""
# If the value is none, then don't try to convert it
if str_value is None:
return default_value
# Try to convert the string to an integer
try:
return int(str(str_value).strip())
except ValueError:
# Return none if the value could not be converted
return default_value
# initialize the handler
admin.init(RestHandler, admin.CONTEXT_NONE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment