Last active
November 2, 2017 21:19
-
-
Save LukeMurphey/7479309 to your computer and use it in GitHub Desktop.
A base class that can be used for making Python-based modular inputs for Splunk #Splunk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a base class for making Python modular inputs for Splunk. | |
To make a modular input based on this class, you should follow the steps defined below. | |
Note that this example assumes you are making an input named "my_input_name". | |
________________________________________________________ | |
1) Define the input in inputs.conf.spec | |
You will need to define you input in a spec file within the following directory within your app: | |
README/inputs.conf.spec | |
You should define your input in the inputs.conf.spec by declaring the fields your input accepts. | |
This file should look something like this: | |
[my_input_name://default] | |
* Configure an input for do something | |
title = <value> | |
* The title of the input | |
url = <value> | |
* The URL to be checked | |
________________________________________________________ | |
2) Include this module in your app | |
Put this module within your app. You can put this within you bin directory or within a | |
sub-directory of the bin directory. I generally recommend putting python modules under the bin | |
directory with a directory that is specific to your app. Something like: | |
bin/my_app_name/modular_input.py | |
________________________________________________________ | |
3) Define defaults for your inputs in inputs.conf | |
You can define default values for your inputs within the inputs.conf file. This file would be: | |
default/inputs.conf | |
The contents of the file would be something like this: | |
[my_input_name] | |
url = http://mydefaulturl.com | |
________________________________________________________ | |
4) Create your modular input class | |
Create your modular input class. This class must be named the same as your input name and it must | |
be placed within the bin directory. In this example, the input should be in the following path | |
since the input is named "my_input_name": | |
bin/my_input_name.py | |
Below is an example of a modular input class. This class does the following: | |
1) Defines the scheme_args which provides some info about the modular input | |
2) Defines the parameters that the input accepts | |
3) Runs the modular input | |
import sys | |
from modular_input import Field, ModularInput, URLField | |
class MyInput(ModularInput): | |
def __init__(self, timeout=30): | |
scheme_args = {'title': "My input name", | |
'description': "This input is an example", | |
'use_external_validation': "true", | |
'streaming_mode': "xml", | |
'use_single_instance': "true"} | |
args = [ | |
Field("title", "Title", "A short description of the input", empty_allowed=False), | |
URLField("url", "URL", "The URL to connect to", empty_allowed=False) | |
] | |
ModularInput.__init__(self, scheme_args, args, logger_name='my_input_modular_input') | |
def run(self, stanza, cleaned_params, input_config): | |
interval = cleaned_params["interval"] | |
title = cleaned_params["title"] | |
host = cleaned_params.get("host", None) | |
index = cleaned_params.get("index", "default") | |
sourcetype = cleaned_params.get("sourcetype", "my_app_name") | |
url = cleaned_params["url"] | |
if self.needs_another_run(input_config.checkpoint_dir, stanza, interval): | |
self.logger.debug("Your input should do something here, stanza=%s", stanza) | |
if __name__ == '__main__': | |
my_input = None | |
try: | |
my_input = MyInput() | |
my_input.execute() | |
sys.exit(0) | |
except Exception as e: | |
# This logs general exceptions that would have been unhandled otherwise (such as coding | |
# errors) | |
if my_input is not None and my_input.logger is not None: | |
my_input.logger.exception("Unhandled exception was caught, this may be due to a defect in the script") | |
else: | |
raise e | |
""" | |
import logging | |
from logging import handlers | |
import xml.dom | |
from xml.dom.minidom import Document | |
import sys | |
import re | |
import time | |
import os | |
import hashlib | |
import json | |
from urlparse import urlparse | |
from threading import RLock | |
# Try to load Splunk's libraries. An inability to do so likely means we are running on a universal | |
# forwarder (since it doesn't include Python). We will proceed but will be unable to access | |
# Splunk's endpoints via simple request which means we will not able to load secure credentials. | |
try: | |
from splunk.appserver.mrsparkle.lib.util import make_splunkhome_path | |
from splunk.util import normalizeBoolean as normBool | |
import splunk.rest | |
uf_mode = False | |
except: | |
def normBool(value): | |
if str(value).strip().lower() in ['1', 'true']: | |
return True | |
else: | |
return False | |
uf_mode = True | |
class FieldValidationException(Exception): | |
pass | |
class Field(object): | |
""" | |
This is the base class that should be used to for field validators. Sub-class this and | |
override to_python if you need custom validation. | |
""" | |
DATA_TYPE_STRING = 'string' | |
DATA_TYPE_NUMBER = 'number' | |
DATA_TYPE_BOOLEAN = 'boolean' | |
def get_data_type(self): | |
""" | |
Get the type of the field. | |
""" | |
return Field.DATA_TYPE_STRING | |
def __init__(self, name, title, description, none_allowed=False, empty_allowed=True, | |
required_on_create=None, required_on_edit=None): | |
""" | |
Create the field. | |
Arguments: | |
name -- Set the name of the field (e.g. "database_server") | |
title -- Set the human readable title (e.g. "Database server") | |
description -- Set the human readable description of the field (e.g. "The IP or domain name | |
of the database server") | |
none_allowed -- Is a value of none allowed? | |
empty_allowed -- Is an empty string allowed? | |
required_on_create -- Is this field required when creating? | |
required_on_edit -- Is this field required when editing? | |
""" | |
# Try to set required_on_create and required_on_edit to sane defaults if not defined | |
if required_on_create is None and none_allowed: | |
required_on_create = False | |
elif required_on_create is None and not none_allowed: | |
required_on_create = True | |
if required_on_edit is None and required_on_create is not None: | |
required_on_edit = required_on_create | |
if name is None: | |
raise ValueError("The name parameter cannot be none") | |
if len(name.strip()) == 0: | |
raise ValueError("The name parameter cannot be empty") | |
if title is None: | |
raise ValueError("The title parameter cannot be none") | |
if len(title.strip()) == 0: | |
raise ValueError("The title parameter cannot be empty") | |
if description is None: | |
raise ValueError("The description parameter cannot be none") | |
if len(description.strip()) == 0: | |
raise ValueError("The description parameter cannot be empty") | |
self.name = name | |
self.title = title | |
self.description = description | |
self.none_allowed = none_allowed | |
self.empty_allowed = empty_allowed | |
self.required_on_create = required_on_create | |
self.required_on_edit = required_on_edit | |
def to_python(self, value, session_key=None): | |
""" | |
Convert the field to a Python object. Should throw a FieldValidationException if the data | |
is invalid. | |
Arguments: | |
value -- The value to convert | |
session_key- The session key to access Splunk (if needed) | |
""" | |
if not self.none_allowed and value is None: | |
raise FieldValidationException("The value for the '%s' parameter cannot be empty" % (self.name)) | |
if not self.empty_allowed and len(str(value).strip()) == 0: | |
raise FieldValidationException("The value for the '%s' parameter cannot be empty" % (self.name)) | |
return value | |
def to_string(self, value): | |
""" | |
Convert the field to a string value that can be returned. Should throw a | |
FieldValidationException if the data is invalid. | |
Arguments: | |
value -- The value to convert | |
""" | |
return str(value) | |
class BooleanField(Field): | |
""" | |
A validator that converts string versions of boolean to a real boolean. | |
""" | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
if value in [True, False]: | |
return value | |
elif str(value).strip().lower() in ["true", "1"]: | |
return True | |
elif str(value).strip().lower() in ["false", "0"]: | |
return False | |
raise FieldValidationException("The value of '%s' for the '%s' parameter is not a valid boolean" % (str(value), self.name)) | |
def to_string(self, value): | |
if value == True: | |
return "1" | |
elif value == False: | |
return "0" | |
return str(value) | |
def get_data_type(self): | |
return Field.DATA_TYPE_BOOLEAN | |
class ListField(Field): | |
""" | |
A validator that converts a comma seperated string to an array. | |
""" | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
if value is not None: | |
return value.split(",") | |
else: | |
return [] | |
def to_string(self, value): | |
if value is not None: | |
return ",".join(value) | |
return "" | |
class StaticListField(Field): | |
""" | |
This allows you to specify a list of field values that are allowed. | |
All other values will be rejected. | |
""" | |
_valid_values = None | |
def __init__(self, name, title, description, none_allowed=False, empty_allowed=True, required_on_create=None, required_on_edit=None, valid_values=None): | |
super(StaticListField, self).__init__(name, title, description, none_allowed, empty_allowed, required_on_create, required_on_edit) | |
self.valid_values = valid_values | |
@property | |
def valid_values(self): | |
return self._valid_values | |
@valid_values.setter | |
def valid_values(self, values): | |
self._valid_values = values | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
if value is None: | |
return None | |
elif value not in self.valid_values: | |
raise FieldValidationException('The value of the "' + self.name + '" field is invalid, it must be one of:' + ','.join(self.valid_values)) | |
else: | |
return value | |
class RegexField(Field): | |
""" | |
A validator that validates input matches a regular expression. | |
""" | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
if value is not None: | |
try: | |
return re.compile(value) | |
except Exception as exception: | |
raise FieldValidationException(str(exception)) | |
else: | |
return None | |
def to_string(self, value): | |
if value is not None: | |
return value.pattern | |
return "" | |
class IntegerField(Field): | |
""" | |
A validator that converts string input to an integer. | |
""" | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
if value is not None: | |
try: | |
return int(value) | |
except ValueError as exception: | |
raise FieldValidationException(str(exception)) | |
else: | |
return None | |
def to_string(self, value): | |
if value is not None: | |
return str(value) | |
return "" | |
def get_data_type(self): | |
return Field.DATA_TYPE_NUMBER | |
class FloatField(Field): | |
""" | |
A validator that converts string input to a float. | |
""" | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
if value is not None: | |
try: | |
return float(value) | |
except ValueError as exception: | |
raise FieldValidationException(str(exception)) | |
else: | |
return None | |
def to_string(self, value): | |
if value is not None: | |
return str(value) | |
return "" | |
def get_data_type(self): | |
return Field.DATA_TYPE_NUMBER | |
class RangeField(Field): | |
""" | |
A validator that converts string input to a pair of integers indicating a range. | |
""" | |
def __init__(self, name, title, description, low, high, none_allowed=False, empty_allowed=True): | |
super(RangeField, self).__init__(name, title, description, none_allowed=False, | |
empty_allowed=True) | |
self.low = low | |
self.high = high | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
if value is not None: | |
try: | |
tmp = int(value) | |
return tmp >= self.low and tmp <= self.high | |
except ValueError as exception: | |
raise FieldValidationException(str(exception)) | |
else: | |
return None | |
def to_string(self, value): | |
if value is not None: | |
return str(value) | |
return "" | |
def get_data_type(self): | |
return Field.DATA_TYPE_NUMBER | |
class URLField(Field): | |
""" | |
Represents a URL. The URL is converted to a Python object that was created via urlparse. | |
""" | |
require_https_on_cloud = False | |
def __init__(self, name, title, description, none_allowed=False, empty_allowed=True, | |
required_on_create=None, required_on_edit=None, require_https_on_cloud=False): | |
super(URLField, self).__init__(name, title, description, none_allowed, | |
empty_allowed, required_on_create, required_on_edit) | |
self.require_https_on_cloud = require_https_on_cloud | |
@classmethod | |
def parse_url(cls, value, name): | |
""" | |
Parse a URL and generation an exception if it is invalid.BaseException | |
Otherwise, return a parsed URL (via urlparse). | |
""" | |
parsed_value = urlparse(value) | |
if parsed_value.hostname is None or len(parsed_value.hostname) <= 0: | |
raise FieldValidationException("The value of '%s' for the '%s' parameter does not contain a host name" % (str(value), name)) | |
if parsed_value.scheme not in ["http", "https"]: | |
raise FieldValidationException("The value of '%s' for the '%s' parameter does not contain a valid protocol (only http and https are supported)" % (str(value), name)) | |
return parsed_value | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
parsed_value = URLField.parse_url(value.strip(), self.name) | |
if self.require_https_on_cloud and parsed_value.scheme == "http" and session_key is not None and ModularInput.is_on_cloud(session_key): | |
raise FieldValidationException("The value of '%s' for the '%s' parameter must use encryption (be HTTPS not HTTP)" % (str(value), self.name)) | |
return parsed_value | |
def to_string(self, value): | |
return value.geturl() | |
class DurationField(Field): | |
""" | |
The duration field represents a duration as represented by a string such as 1d for a 24 hour | |
period. | |
The string is converted to an integer indicating the number of seconds. | |
""" | |
DURATION_RE = re.compile("(?P<duration>[0-9]+)\s*(?P<units>[a-z]*)", re.IGNORECASE) | |
MINUTE = 60 | |
HOUR = 60 * MINUTE | |
DAY = 24 * HOUR | |
WEEK = 7 * DAY | |
UNITS = { | |
'w' : WEEK, | |
'week' : WEEK, | |
'd' : DAY, | |
'day' : DAY, | |
'h' : HOUR, | |
'hour' : HOUR, | |
'm' : MINUTE, | |
'min' : MINUTE, | |
'minute' : MINUTE, | |
's' : 1 | |
} | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
# Parse the duration | |
duration_match = DurationField.DURATION_RE.match(value) | |
# Make sure the duration could be parsed | |
if duration_match is None: | |
raise FieldValidationException("The value of '%s' for the '%s' parameter is not a valid duration" % (str(value), self.name)) | |
# Get the units and duration | |
match_dict = duration_match.groupdict() | |
units = match_dict['units'] | |
# Parse the value provided | |
try: | |
duration = int(match_dict['duration']) | |
except ValueError: | |
raise FieldValidationException("The duration '%s' for the '%s' parameter is not a valid number" % (match_dict['duration'], self.name)) | |
# Make sure the units are valid | |
if len(units) > 0 and units not in DurationField.UNITS: | |
raise FieldValidationException("The unit '%s' for the '%s' parameter is not a valid unit of duration" % (units, self.name)) | |
# Convert the units to seconds | |
if len(units) > 0: | |
return duration * DurationField.UNITS[units] | |
else: | |
return duration | |
def to_string(self, value): | |
return str(value) | |
class DeprecatedField(Field): | |
""" | |
Represents a field that is no longer used. This should be used when you want the input to pass | |
validation with arguments that are no longer used. | |
""" | |
def __init__(self, name, title, description, none_allowed=True, empty_allowed=True, | |
required_on_create=False, required_on_edit=False): | |
""" | |
Create the field. | |
Arguments: | |
name -- Set the name of the field (e.g. "database_server") | |
title -- Set the human readable title (e.g. "Database server") | |
description -- Set the human readable description of the field (e.g. "The IP or domain name of the database server") | |
none_allowed -- Is a value of none allowed? | |
empty_allowed -- Is an empty string allowed? | |
required_on_create -- Is this field required when creating? | |
required_on_edit -- Is this field required when editing? | |
""" | |
super(DeprecatedField, self).__init__(name, title, description, | |
none_allowed=none_allowed, | |
empty_allowed=empty_allowed, | |
required_on_create=required_on_create, | |
required_on_edit=required_on_edit) | |
def to_python(self, value, session_key=None): | |
return None | |
def to_string(self, value): | |
return "" | |
class FilePathField(Field): | |
''' | |
Represents a path to file. | |
''' | |
def __init__(self, name, title, description, none_allowed=False, empty_allowed=True, | |
required_on_create=None, required_on_edit=None, validate_file_existence=True): | |
""" | |
Create the field. | |
Arguments: | |
name -- Set the name of the field (e.g. "database_server") | |
title -- Set the human readable title (e.g. "Database server") | |
description -- Set the human readable description of the field (e.g. "The IP or domain name | |
of the database server") | |
none_allowed -- Is a value of none allowed? | |
empty_allowed -- Is an empty string allowed? | |
required_on_create -- Is this field required when creating? | |
required_on_edit -- Is this field required when editing? | |
validate_file_existence -- If true, this field will generate an error if the file doesn't exist | |
""" | |
super(FilePathField, self).__init__(name, title, description, none_allowed, empty_allowed, required_on_create, required_on_edit) | |
self.validate_file_existence = validate_file_existence | |
def to_python(self, value, session_key=None): | |
Field.to_python(self, value, session_key) | |
# Don't bother validating if the parameter wasn't provided | |
if value is None or len(value.strip()) == 0: | |
return value | |
# Resolve the file path as necessary | |
resolved_path = None | |
if value is not None: | |
if os.path.isabs(value) or uf_mode: | |
resolved_path = value | |
else: | |
path = os.path.join(make_splunkhome_path([value])) | |
resolved_path = path | |
# Validate the file existence if requested | |
if self.validate_file_existence and not os.path.isfile(resolved_path): | |
raise FieldValidationException("The parameter '%s' is not a valid path; '%s' does not exist" % (self.name, resolved_path)) | |
return resolved_path | |
def to_string(self, value): | |
return value | |
class ModularInputConfig(): | |
""" | |
This class represents the configuration related to a running modular input. | |
""" | |
def __init__(self, server_host, server_uri, session_key, checkpoint_dir, configuration): | |
self.server_host = server_host | |
self.server_uri = server_uri | |
self.session_key = session_key | |
self.checkpoint_dir = checkpoint_dir | |
self.configuration = configuration | |
def __str__(self): | |
attrs = ['server_host', 'server_uri', 'session_key', 'checkpoint_dir', 'configuration'] | |
return str({attr: str(getattr(self, attr)) for attr in attrs}) | |
@staticmethod | |
def get_text(node, default=None): | |
""" | |
Get the value of the text in the first node under the given node. | |
Arguments: | |
node -- The node that should have a text node under it. | |
default -- The default text that ought to be returned if no text node could be found | |
(defaults to none). | |
""" | |
if node and node.firstChild and node.firstChild.nodeType == node.firstChild.TEXT_NODE: | |
return node.firstChild.data | |
else: | |
return default | |
@staticmethod | |
def get_config_from_xml(config_str_xml): | |
""" | |
Get the config from the given XML and return a ModularInputConfig instance. | |
Arguments: | |
config_str_xml -- A string of XML that represents the configuration provided by Splunk. | |
""" | |
# Here are the parameters we are going to fill out | |
server_host = None | |
server_uri = None | |
session_key = None | |
checkpoint_dir = None | |
configuration = {} | |
# Parse the document | |
doc = xml.dom.minidom.parseString(config_str_xml) | |
root = doc.documentElement | |
# Get the server_host | |
server_host_node = root.getElementsByTagName("server_host")[0] | |
server_host = ModularInputConfig.get_text(server_host_node) | |
# Get the server_uri | |
server_uri_node = root.getElementsByTagName("server_uri")[0] | |
server_uri = ModularInputConfig.get_text(server_uri_node) | |
# Get the session_key | |
session_key_node = root.getElementsByTagName("session_key")[0] | |
session_key = ModularInputConfig.get_text(session_key_node) | |
# Get the checkpoint directory | |
checkpoint_node = root.getElementsByTagName("checkpoint_dir")[0] | |
checkpoint_dir = ModularInputConfig.get_text(checkpoint_node) | |
# Parse the config | |
conf_node = root.getElementsByTagName("configuration")[0] | |
if conf_node: | |
for stanza in conf_node.getElementsByTagName("stanza"): | |
config = {} | |
if stanza: | |
stanza_name = stanza.getAttribute("name") | |
if stanza_name: | |
config["name"] = stanza_name | |
params = stanza.getElementsByTagName("param") | |
for param in params: | |
param_name = param.getAttribute("name") | |
config[param_name] = ModularInputConfig.get_text(param) | |
configuration[stanza_name] = config | |
return ModularInputConfig(server_host, server_uri, session_key, checkpoint_dir, | |
configuration) | |
def forgive_splunkd_outages(function): | |
""" | |
Try the given function and swallow Splunkd connection exceptions until the limit is reached or | |
the function works. | |
Arguments: | |
function -- The function to call | |
""" | |
def wrapper(*args, **kwargs): | |
""" | |
This wrapper will provide the swallowing of the exception for the provided function call. | |
""" | |
attempts = 6 | |
attempt_delay = 5 | |
attempts_tried = 0 | |
while attempts_tried < attempts: | |
try: | |
return function(*args, **kwargs) | |
except splunk.SplunkdConnectionException: | |
# Sleep for a bit in order to let Splunk recover in case this is a temporary issue | |
time.sleep(attempt_delay) | |
attempts_tried += 1 | |
# If we hit the limit of the attempts, then throw the exception | |
if attempts_tried >= attempts: | |
raise | |
return wrapper | |
class ModularInput(): | |
""" | |
This class functions as a base-class for modular inputs. | |
""" | |
# These arguments cover the standard fields that are always supplied | |
standard_args = [ | |
Field("name", "Stanza name", "The name of the stanza for this modular input", empty_allowed=True), | |
Field("stanza", "Stanza name", "The name of the stanza for this modular input", empty_allowed=True), | |
Field("source", "Source", "The source for events created by this modular input", empty_allowed=True), | |
Field("sourcetype", "Stanza name", "The name of the stanza for this modular input", empty_allowed=True, none_allowed=True), | |
Field("index", "Index", "The index that data should be sent to", empty_allowed=True, none_allowed=True), | |
Field("host", "Host", "The host that is running the input", empty_allowed=True), | |
BooleanField("disabled", "Disabled", "Whether the modular input is disabled or not", empty_allowed=True) | |
] | |
title = 'No title was provided' | |
use_external_validation = True | |
description = "" | |
streaming_mode = 'true' | |
server_info = None | |
def _is_valid_param(self, name, val): | |
'''Raise an error if the parameter is None or empty.''' | |
if val is None: | |
raise ValueError("The {0} parameter cannot be none".format(name)) | |
if len(str(val).strip()) == 0: | |
raise ValueError("The {0} parameter cannot be empty".format(name)) | |
return val | |
def _create_formatter_textnode(self, xmldoc, nodename, value): | |
'''Shortcut for creating a formatter textnode. | |
Arguments: | |
xmldoc - A Document object. | |
nodename - A string name for the node. | |
''' | |
node = xmldoc.createElement(nodename) | |
text = xmldoc.createTextNode(str(value)) | |
node.appendChild(text) | |
return node | |
def _create_document(self): | |
'''Create the document for sending XML streaming events.''' | |
doc = Document() | |
# Create the <stream> base element | |
stream = doc.createElement('stream') | |
doc.appendChild(stream) | |
return doc | |
def _create_event(self, doc, params, stanza, unbroken=False, close=True): | |
'''Create an event for XML streaming output. | |
Arguments: | |
doc - a Document object. | |
params - a dictionary of attributes for the event. | |
stanza_name - the stanza | |
''' | |
# Create the <event> base element | |
event = doc.createElement('event') | |
# Indicate if this event is to be unbroken (meaning a </done> tag will | |
# need to be added by a future event. | |
if unbroken: | |
event.setAttribute('unbroken', '1') | |
# Indicate if this script is single-instance mode or not. | |
if self.streaming_mode == 'true': | |
event.setAttribute('stanza', stanza) | |
# Define the possible elements | |
valid_elements = ['host', 'index', 'source', 'sourcetype', 'time', 'data'] | |
# Append the valid child elements. Invalid elements will be dropped. | |
for element in filter(lambda x: x in valid_elements, params.keys()): | |
event.appendChild(self._create_formatter_textnode(doc, element, params[element])) | |
if close: | |
event.appendChild(doc.createElement('done')) | |
return event | |
def _print_event(self, doc, event): | |
'''Adds an event to XML streaming output.''' | |
# Get the stream from the document. | |
stream = doc.firstChild | |
# Append the event. | |
stream.appendChild(event) | |
# Return the content as a string WITHOUT the XML header; remove the | |
# child object so the next event can be returned and reuse the same | |
# Document object. | |
output = doc.documentElement.toxml() | |
stream.removeChild(event) | |
return output | |
def _add_events(self, doc, events): | |
'''Adds a set of events to XML streaming output.''' | |
# Get the stream from the document. | |
stream = doc.firstChild | |
# Add the <event> node. | |
for event in events: | |
stream.appendChild(event) | |
# Return the content as a string WITHOUT the XML header. | |
return doc.documentElement.toxml() | |
def escape_spaces(self, s, encapsulate_in_double_quotes=False): | |
""" | |
If the string contains spaces or is empty, then add double quotes around the string. This | |
is useful when outputting fields and values to Splunk since a space will cause Splunk to | |
not recognize the entire value. | |
Arguments: | |
s -- A string to escape. | |
encapsulate_in_double_quotes -- If true, the value will have double-spaces added around it. | |
""" | |
# Make sure the input is a string | |
if s is not None: | |
s = str(s) | |
# Escape the spaces within the string (will need KV_MODE = auto_escaped for this to work) | |
if s is not None: | |
s = s.replace('"', '\\"') | |
s = s.replace("'", "\\'") | |
if s is not None and (" " in s or encapsulate_in_double_quotes or s == ""): | |
return '"' + s + '"' | |
else: | |
return s | |
def create_event_string(self, data_dict, stanza, sourcetype, source, index, host=None, | |
unbroken=False, close=False, encapsulate_value_in_double_quotes=False): | |
""" | |
Create a string representing the event. | |
Argument: | |
data_dict -- A dictionary containing the fields | |
stanza -- The stanza used for the input | |
sourcetype -- The sourcetype | |
source -- The source field value | |
index -- The index to send the event to | |
unbroken -- | |
close -- | |
encapsulate_value_in_double_quotes -- If true, the value will have double-quotes added around it. | |
""" | |
# Make the content of the event | |
data_str = '' | |
for k, v in data_dict.items(): | |
# If the value is a list, then write out each matching value with the same name (as mv) | |
if isinstance(v, list) and not isinstance(v, basestring): | |
values = v | |
else: | |
values = [v] | |
k_escaped = self.escape_spaces(k) | |
# Write out each value | |
for v in values: | |
v_escaped = self.escape_spaces(v, encapsulate_in_double_quotes=encapsulate_value_in_double_quotes) | |
if len(data_str) > 0: | |
data_str += ' ' | |
data_str += '%s=%s' % (k_escaped, v_escaped) | |
# Make the event | |
event_dict = {'stanza': stanza, | |
'data' : data_str} | |
if index is not None: | |
event_dict['index'] = index | |
if sourcetype is not None: | |
event_dict['sourcetype'] = sourcetype | |
if source is not None: | |
event_dict['source'] = source | |
if host is not None: | |
event_dict['host'] = host | |
event = self._create_event(self.document, | |
params=event_dict, | |
stanza=stanza, | |
unbroken=unbroken, | |
close=close) | |
# If using unbroken events, the last event must have been | |
# added with a "</done>" tag. | |
return self._print_event(self.document, event) | |
def output_event(self, data_dict, stanza, index=None, sourcetype=None, source=None, host=None, | |
unbroken=False, close=False, out=sys.stdout, | |
encapsulate_value_in_double_quotes=False): | |
""" | |
Output the given even so that Splunk can see it. | |
Arguments: | |
data_dict -- A dictionary containing the fields | |
stanza -- The stanza used for the input | |
sourcetype -- The sourcetype | |
source -- The source to use | |
index -- The index to send the event to | |
unbroken -- | |
close -- | |
out -- The stream to send the event to (defaults to standard output) | |
host -- The host | |
encapsulate_value_in_double_quotes -- If true, the value will have double-quotes added | |
around it. This is useful in cases where the app | |
contains props & transforms that require the value | |
to have double-spaces. | |
""" | |
output = self.create_event_string(data_dict, stanza, sourcetype, source, index, host, | |
unbroken, close, | |
encapsulate_value_in_double_quotes=encapsulate_value_in_double_quotes) | |
with self.lock: | |
out.write(output) | |
out.flush() | |
def __init__(self, scheme_args, args=None, sleep_interval=5, logger_name='python_modular_input', | |
logger_level=None): | |
""" | |
Set up the modular input. | |
Arguments: | |
scheme_args -- The scheme args indicating the run-time mode of the input | |
args -- A list of Field instances for validating the arguments | |
sleep_interval -- How often to sleep between runs | |
logger_name -- The logger name to append to the logger | |
""" | |
# Setup defaults | |
default_scheme_args = { | |
"use_external_validation" : "true", | |
"streaming_mode" : "xml", | |
"use_single_instance" : True | |
} | |
scheme_args = dict(default_scheme_args.items() + scheme_args.items()) | |
# Set the scheme arguments. | |
for arg in scheme_args: | |
setattr(self, arg, self._is_valid_param(arg, scheme_args.get(arg))) | |
# Convert over the use_single_instance argument to a boolean | |
self.use_single_instance = normBool(self.use_single_instance) | |
if args is None: | |
self.args = [] | |
else: | |
self.args = args[:] | |
if sleep_interval > 0: | |
self.sleep_interval = sleep_interval | |
else: | |
self.sleep_interval = 5 | |
# Create the document used for sending events to Splunk through | |
self.document = self._create_document() | |
# Make a lock for controlling access to underlying functions | |
self.lock = RLock() | |
# Initialize the logger level | |
if logger_level is None: | |
self.logger_level = logging.INFO | |
else: | |
self.logger_level = logger_level | |
# Check and save the logger name | |
self._logger = None | |
if logger_name is None or len(logger_name) == 0: | |
raise Exception("Logger name cannot be empty") | |
self.logger_name = logger_name | |
# Keep an instance of the server-info around to prevent unnecessary REST calls | |
self.server_info = None | |
def addArg(self, arg): | |
""" | |
Add a given argument to the list of arguments. | |
Arguments: | |
arg -- An instance of Field that represents an argument. | |
""" | |
if self.args is None: | |
self.args = [] | |
self.args.append(arg) | |
def usage(self, out=sys.stdout): | |
""" | |
Print a usage statement. | |
Arguments: | |
out -- The stream to write the message to (defaults to standard output) | |
""" | |
out.write("usage: %s [--scheme|--validate-arguments]") | |
def do_scheme(self, out=sys.stdout): | |
""" | |
Get the scheme and write it out to standard output. | |
Arguments: | |
out -- The stream to write the message to (defaults to standard output) | |
""" | |
self.logger.debug("Modular input: scheme requested") | |
out.write(self.get_scheme()) | |
return True | |
@property | |
def logger(self): | |
""" | |
Returns a logger. A logger will be created if necessary. | |
""" | |
# Make a logger unless it already exists | |
if self._logger is not None: | |
return self._logger | |
logger = logging.getLogger(self.logger_name) | |
# Prevent the log messages from being duplicated in the python.log file | |
logger.propagate = False | |
logger.setLevel(self.logger_level) | |
if uf_mode: | |
file_handler = handlers.RotatingFileHandler(os.path.join(os.environ['SPLUNK_HOME'], 'var', 'log', self.logger_name + '.log'), maxBytes=25000000, backupCount=5) | |
else: | |
file_handler = handlers.RotatingFileHandler(make_splunkhome_path(['var', 'log', 'splunk', self.logger_name + '.log']), maxBytes=25000000, backupCount=5) | |
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
self._logger = logger | |
return self._logger | |
@logger.setter | |
def logger(self, logger): | |
self._logger = logger | |
def escape_colons(self, string_to_escape): | |
""" | |
Escape the colons. This is necessary for secure password stanzas. | |
""" | |
return string_to_escape.replace(":", "\\:") | |
def get_secure_password_stanza(self, username, realm=""): | |
""" | |
Make the stanza name for a entry in the storage/passwords endpoint from the username and | |
realm. | |
""" | |
return self.escape_colons(realm) + ":" + self.escape_colons(username) + ":" | |
@forgive_splunkd_outages | |
def get_secure_password(self, realm, username=None, session_key=None): | |
""" | |
Get the secure password that matches the given realm and username. If no username is | |
provided, the first entry with the given realm will be returned. | |
""" | |
if uf_mode: | |
self.logger.warn("Unable to retrieve the secure credential since the input appears " + | |
"to be running in a Univeral Forwarder") | |
# Cannot get the secure password in universal forwarder mode since we don't | |
# have access to Splunk libraries | |
return None | |
# Look up the entry by realm only if no username is provided. | |
if username is None or len(username) == 0: | |
return self.get_secure_password_by_realm(realm, session_key) | |
# Get secure password | |
stanza = self.get_secure_password_stanza(username, realm) | |
try: | |
server_response, server_content = splunk.rest.simpleRequest('/services/storage/passwords/' + stanza + '?output_mode=json', sessionKey=session_key) | |
except splunk.ResourceNotFound: | |
return None | |
if server_response['status'] == '404': | |
return None | |
elif server_response['status'] != '200': | |
raise Exception("Could not get the secure passwords") | |
passwords_content = json.loads(server_content) | |
password = passwords_content['entry'] | |
return password[0] | |
@forgive_splunkd_outages | |
def get_secure_password_by_realm(self, realm, session_key): | |
""" | |
Get the secure password that matches the given realm. | |
""" | |
# Get secure passwords | |
server_response, server_content = splunk.rest.simpleRequest('/services/storage/passwords?output_mode=json', sessionKey=session_key) | |
if server_response['status'] != '200': | |
raise Exception("Could not get the secure passwords") | |
passwords_content = json.loads(server_content) | |
passwords = passwords_content['entry'] | |
# Filter down output to the ones matching the realm | |
matching_passwords = filter(lambda x: x['content']['realm'] == realm, passwords) | |
if len(matching_passwords) > 0: | |
return matching_passwords[0] | |
else: | |
return None | |
@classmethod | |
@forgive_splunkd_outages | |
def get_server_info(cls, session_key, force_refresh=False): | |
""" | |
Get the server information object. | |
""" | |
# Use the cached server information if possible | |
if not force_refresh and cls.server_info is not None: | |
return cls.server_info | |
# Get the server info | |
_, server_content = splunk.rest.simpleRequest('/services/server/info/server-info?output_mode=json', sessionKey=session_key) | |
info_content = json.loads(server_content) | |
cls.server_info = info_content['entry'][0] | |
return cls.server_info | |
@classmethod | |
@forgive_splunkd_outages | |
def is_on_cloud(cls, session_key): | |
""" | |
Determine if the host is running on cloud. | |
""" | |
server_info = cls.get_server_info(session_key) | |
return server_info['content'].get('instance_type', None) == 'cloud' | |
def bool_to_str(self, bool_value): | |
""" | |
Convert a value that operates like a boolean and return a string. | |
""" | |
if bool_value: | |
return "true" | |
else: | |
return "false" | |
def get_scheme(self): | |
""" | |
Get the scheme of the inputs parameters and return as a string. | |
""" | |
# Create the XML document | |
doc = Document() | |
# Create the <scheme> base element | |
element_scheme = doc.createElement("scheme") | |
doc.appendChild(element_scheme) | |
# Create the title element | |
element_title = doc.createElement("title") | |
element_scheme.appendChild(element_title) | |
element_title_text = doc.createTextNode(self.title) | |
element_title.appendChild(element_title_text) | |
# Create the description element | |
element_desc = doc.createElement("description") | |
element_scheme.appendChild(element_desc) | |
element_desc_text = doc.createTextNode(self.description) | |
element_desc.appendChild(element_desc_text) | |
# Create the use_external_validation element | |
element_external_validation = doc.createElement("use_external_validation") | |
element_scheme.appendChild(element_external_validation) | |
element_external_validation_text = doc.createTextNode(self.use_external_validation) | |
element_external_validation.appendChild(element_external_validation_text) | |
# Create the streaming_mode element | |
element_streaming_mode = doc.createElement("streaming_mode") | |
element_scheme.appendChild(element_streaming_mode) | |
element_streaming_mode_text = doc.createTextNode(self.streaming_mode) | |
element_streaming_mode.appendChild(element_streaming_mode_text) | |
# Create the use_single_instance element | |
element_use_single_instance = doc.createElement("use_single_instance") | |
element_scheme.appendChild(element_use_single_instance) | |
element_use_single_instance_text = doc.createTextNode(self.bool_to_str(self.use_single_instance)) | |
element_use_single_instance.appendChild(element_use_single_instance_text) | |
# Create the elements to stored args element | |
element_endpoint = doc.createElement("endpoint") | |
element_scheme.appendChild(element_endpoint) | |
element_args = doc.createElement("args") | |
element_endpoint.appendChild(element_args) | |
# Create the argument elements | |
self.add_xml_args(doc, element_args) | |
# Return the content as a string | |
return doc.toxml() | |
def add_xml_args(self, doc, element_args): | |
""" | |
Add the arguments to the XML scheme. | |
Arguments: | |
doc -- The XML document | |
element_args -- The element that should be the parent of the arg elements that will be | |
added. | |
""" | |
for arg in self.args: | |
# Skip the interval argument if in multi-instance mode since Splunk will complain | |
# otherwise | |
if not self.use_single_instance and arg.name == "interval": | |
continue | |
element_arg = doc.createElement("arg") | |
element_arg.setAttribute("name", arg.name) | |
element_args.appendChild(element_arg) | |
# Create the title element | |
element_title = doc.createElement("title") | |
element_arg.appendChild(element_title) | |
element_title_text = doc.createTextNode(arg.title) | |
element_title.appendChild(element_title_text) | |
# Create the description element | |
element_desc = doc.createElement("description") | |
element_arg.appendChild(element_desc) | |
element_desc_text = doc.createTextNode(arg.description) | |
element_desc.appendChild(element_desc_text) | |
# Create the data_type element | |
element_data_type = doc.createElement("data_type") | |
element_arg.appendChild(element_data_type) | |
element_data_type_text = doc.createTextNode(arg.get_data_type()) | |
element_data_type.appendChild(element_data_type_text) | |
# Create the required_on_create element | |
element_required_on_create = doc.createElement("required_on_create") | |
element_arg.appendChild(element_required_on_create) | |
element_required_on_create_text = doc.createTextNode("true" if arg.required_on_create else "false") | |
element_required_on_create.appendChild(element_required_on_create_text) | |
# Create the required_on_save element | |
element_required_on_edit = doc.createElement("required_on_edit") | |
element_arg.appendChild(element_required_on_edit) | |
element_required_on_edit_text = doc.createTextNode("true" if arg.required_on_edit else "false") | |
element_required_on_edit.appendChild(element_required_on_edit_text) | |
def do_validation(self, in_stream=sys.stdin): | |
""" | |
Get the validation data from standard input and attempt to validate it. Returns true if | |
the arguments validated, false otherwise. | |
Arguments: | |
in_stream -- The stream to get the input from (defaults to standard input) | |
""" | |
data, session_key = self.get_validation_data() | |
try: | |
self.validate_parameters(None, data, session_key) | |
return True | |
except FieldValidationException as e: | |
self.print_error(str(e)) | |
return False | |
def validate(self, arguments, session_key=None): | |
""" | |
Validate the argument dictionary where each key is a stanza. | |
Arguments: | |
arguments -- The arguments as an dictionary where the key is the stanza and the value is a | |
dictionary of the values. | |
session_key -- The session key for accessing Splunkd | |
""" | |
# Check each stanza | |
for stanza, parameters in arguments.items(): | |
self.validate_parameters(stanza, parameters, session_key) | |
return True | |
def validate_parameters(self, stanza, parameters, session_key=None): | |
""" | |
Validate the parameter set for a stanza and returns a dictionary of cleaned parameters. | |
Arguments: | |
stanza -- The stanza name | |
parameters -- The list of parameters | |
session_key -- The session key for accessing Splunkd | |
""" | |
cleaned_params = {} | |
# Append the arguments list such that the standard fields that Splunk provides are included | |
all_args = {} | |
for argument_validator in self.standard_args: | |
all_args[argument_validator.name] = argument_validator | |
for argument_validator in self.args: | |
all_args[argument_validator.name] = argument_validator | |
# Convert and check the parameters | |
for name, value in parameters.items(): | |
# If the argument was found, then validate and convert it | |
if name in all_args: | |
cleaned_params[name] = all_args[name].to_python(value, session_key=session_key) | |
# Allow the interval argument since it is internal but allowed even if not explicitly | |
# declared | |
elif name == "interval" and self.use_single_instance == False: | |
pass | |
# Throw an exception if the argument could not be found | |
else: | |
raise FieldValidationException("The parameter '%s' is not a valid argument" % (name)) | |
return cleaned_params | |
def print_error(self, error, out=sys.stdout): | |
""" | |
Prints the given error message to standard output. | |
Arguments: | |
error -- The message to be printed | |
out -- The stream to write the message to (defaults to standard output) | |
""" | |
out.write("<error><message>%s</message></error>" % error) | |
def read_config(self, in_stream=sys.stdin): | |
""" | |
Read the config from standard input and return the configuration. | |
in_stream -- The stream to get the input from (defaults to standard input) | |
""" | |
config_str_xml = in_stream.read() | |
return ModularInputConfig.get_config_from_xml(config_str_xml) | |
def run(self, stanza, cleaned_params, input_config): | |
""" | |
Run the input using the arguments provided. | |
Arguments: | |
stanza -- The name of the stanza | |
cleaned_params -- The arguments following validation and conversion to Python objects. | |
input_config -- A dictionary that provides configuration data like session keys | |
""" | |
raise Exception("Run function was not implemented") | |
@classmethod | |
def is_expired(cls, last_run, interval, cur_time=None): | |
""" | |
Indicates if the last run time is expired based on the value of the last_run parameter. | |
Arguments: | |
last_run -- The time that the analysis was last done | |
interval -- The interval that the analysis ought to be done (as an integer) | |
cur_time -- The current time (will be automatically determined if not provided) | |
""" | |
if cur_time is None: | |
cur_time = time.time() | |
if last_run is None: | |
return True | |
elif (last_run + interval) < cur_time: | |
return True | |
else: | |
return False | |
@classmethod | |
def last_ran(cls, checkpoint_dir, stanza): | |
""" | |
Determines the date that the analysis was last performed for the given input (denoted by | |
the stanza name). | |
Arguments: | |
checkpoint_dir -- The directory where checkpoints ought to be saved | |
stanza -- The stanza of the input being used | |
""" | |
checkpoint_dict = cls.get_checkpoint_data(checkpoint_dir, stanza) | |
if checkpoint_dict is None or 'last_run' not in checkpoint_dict: | |
return None | |
else: | |
return checkpoint_dict['last_run'] | |
@classmethod | |
def needs_another_run(cls, checkpoint_dir, stanza, interval, cur_time=None): | |
""" | |
Determines if the given input (denoted by the stanza name) ought to be executed. | |
Arguments: | |
checkpoint_dir -- The directory where checkpoints ought to be saved | |
stanza -- The stanza of the input being used | |
interval -- The frequency that the analysis ought to be performed | |
cur_time -- The current time (will be automatically determined if not provided) | |
""" | |
try: | |
last_ran = cls.last_ran(checkpoint_dir, stanza) | |
return cls.is_expired(last_ran, interval, cur_time) | |
except IOError as e: | |
# The file likely doesn't exist | |
return True | |
except ValueError as e: | |
# The file could not be loaded | |
return True | |
# Default return value | |
return True | |
@classmethod | |
def get_file_path(cls, checkpoint_dir, stanza): | |
""" | |
Get the path to the checkpoint file. | |
Arguments: | |
checkpoint_dir -- The directory where checkpoints ought to be saved | |
stanza -- The stanza of the input being used | |
""" | |
return os.path.join(checkpoint_dir, hashlib.sha224(stanza).hexdigest() + ".json") | |
@classmethod | |
def get_checkpoint_data(cls, checkpoint_dir, stanza="(undefined)", throw_errors=False): | |
""" | |
Gets the checkpoint for this input (if it exists) | |
Arguments: | |
checkpoint_dir -- The directory where checkpoints ought to be saved | |
stanza -- The stanza of the input being used | |
throw_errors -- If false, then None will be returned if the data could not be loaded | |
""" | |
file_pointer = None | |
try: | |
file_pointer = open(cls.get_file_path(checkpoint_dir, stanza)) | |
checkpoint_dict = json.load(file_pointer) | |
return checkpoint_dict | |
except IOError: | |
if throw_errors: | |
raise | |
else: | |
return None | |
except ValueError: | |
if throw_errors: | |
raise | |
else: | |
return None | |
finally: | |
if file_pointer is not None: | |
file_pointer.close() | |
def get_non_deviated_last_run(self, last_ran, interval, stanza): | |
""" | |
This method will return a last_run time that doesn't carry over the processing time. | |
If you used the current time and the script took 5 seconds to run, then next run will | |
actually be 5 seconds after it should have been. | |
Basically, it computes when the interval _should_ have executed so that the input runs on | |
the correct frequency. | |
Arguments: | |
interval -- The execution interval | |
last_ran -- When the input last ran (Unix epoch). | |
stanza -- The stanza that this is for | |
""" | |
# If this is the first run, then set it to the current time | |
if last_ran is None: | |
return time.time() | |
# We don't want the input to interval to slide by including the processing time in the | |
# interval. In other words, if the interval is 60 and it takes 5 seconds to process, | |
# then we don't just want to set the last_run to now because then the interval would | |
# actually be 65 seconds. So, let's assume that the processing time was 0 and we are | |
# right on time. If we assume this, then we would have ran at last_run + interval exactly. | |
# There is a potential problem with this though. We'll deal with that in a bit. | |
last_ran_derived = last_ran + interval | |
# There is a one problem with correcting the last run to the previous time plus the | |
# interval. If the input failed to run for a long time, then we might keep creating a | |
# last_run that is in the past and thus, keep executing the input until we finally come to | |
# the current time. I would rather just skip the ones in the past and start back over. | |
# That is what we will do. | |
if last_ran_derived < (time.time() - interval): | |
# The last run isn't within one interval of the current time. That means we either ran | |
# too long and missed a subsequent run or we just weren't running for a long-time. | |
# To catch up, we'll set it to the current time | |
last_ran_derived = time.time() | |
self.logger.info("Previous run was too far in the past (gap=%rs) and thus some executions of the input may have been missed (stanza=%s)", int(round(last_ran_derived-last_ran)), stanza) | |
#self.logger.info("Calculated non-deviated last_ran=%r from previous_last_ran=%r", last_ran_derived, last_ran) | |
return last_ran_derived | |
def save_checkpoint_data(self, checkpoint_dir, stanza, data): | |
""" | |
Save the checkpoint state. | |
Arguments: | |
checkpoint_dir -- The directory where checkpoints ought to be saved | |
stanza -- The stanza of the input being used | |
data -- A dictionary with the data to save | |
""" | |
with self.lock: | |
fp = None | |
try: | |
fp = open(self.get_file_path(checkpoint_dir, stanza), 'w') | |
json.dump(data, fp) | |
except Exception: | |
self.logger.exception('Failed to save checkpoint directory, check the permissions of the directory="%s"' % checkpoint_dir) | |
finally: | |
if fp is not None: | |
fp.close() | |
def do_shutdown(self): | |
""" | |
This function is called when the modular input should shut down. | |
""" | |
pass | |
def do_run(self, in_stream=sys.stdin, log_exception_and_continue=False): | |
""" | |
Read the config from standard input and return the configuration. | |
in_stream -- The stream to get the input from (defaults to standard input) | |
log_exception_and_continue -- If true, exceptions will not be thrown for invalid | |
configurations and instead the stanza will be skipped. | |
""" | |
# Run the modular import | |
input_config = self.read_config(in_stream) | |
if input_config is None: | |
self.logger.error("Did not receive an input configuration stream from Splunk, input will not run") | |
return | |
while True: | |
# If Splunk is no longer the parent process, then it has shut down and this input | |
# needs to terminate | |
if hasattr(os, 'getppid') and os.getppid() == 1: | |
logging.warn("Modular input is no longer running under Splunk; script will now exit") | |
self.do_shutdown() | |
sys.exit(2) | |
# Initialize the document that will be used to output the results | |
self.document = self._create_document() | |
for stanza, conf in input_config.configuration.items(): | |
try: | |
cleaned_params = self.validate_parameters(stanza, conf) | |
self.run(stanza, cleaned_params, input_config) | |
except FieldValidationException as exception: | |
if log_exception_and_continue: | |
self.logger.error("The input stanza '%s' is invalid: %s" % (stanza, str(exception))) | |
else: | |
raise exception | |
# Stop if the input is not running in single instance mode and allow Splunk to manage | |
# scheduling this input | |
if not self.use_single_instance: | |
self.logger.info("Successfully executed all of the inputs") | |
break | |
# Sleep for a bit | |
try: | |
time.sleep(self.sleep_interval) | |
except IOError: | |
pass | |
# Exceptions such as KeyboardInterrupt and IOError can be thrown in order to | |
# interrupt sleep calls | |
def get_validation_data(self, in_stream=sys.stdin): | |
""" | |
Get the validation data from standard input | |
Arguments: | |
in_stream -- The stream to get the input from (defaults to standard input) | |
""" | |
val_data = {} | |
# Read everything from stdin | |
val_str = in_stream.read() | |
# Parse the validation XML | |
doc = xml.dom.minidom.parseString(val_str) | |
root = doc.documentElement | |
# Parse the session key | |
session_key_node = root.getElementsByTagName("session_key")[0] | |
if session_key_node.firstChild and session_key_node.firstChild.nodeType == session_key_node.firstChild.TEXT_NODE: | |
session_key = session_key_node.firstChild.data | |
else: | |
session_key = None | |
# Parse the parameters | |
item_node = root.getElementsByTagName("item")[0] | |
if item_node: | |
name = item_node.getAttribute("name") | |
val_data["stanza"] = name | |
params_node = item_node.getElementsByTagName("param") | |
for param in params_node: | |
name = param.getAttribute("name") | |
if name and param.firstChild and param.firstChild.nodeType == param.firstChild.TEXT_NODE: | |
val_data[name] = param.firstChild.data | |
return val_data, session_key | |
def validate_parameters_from_cli(self, argument_array=None): | |
""" | |
Load the arguments from the given array (or from the command-line) and validate them. | |
Arguments: | |
argument_array -- An array of arguments (will get them from the command-line arguments if | |
none) | |
""" | |
# Get the arguments from the sys.argv if not provided | |
if argument_array is None: | |
argument_array = sys.argv[1:] | |
# This is the list of parameters we will generate | |
parameters = {} | |
for i in range(0, len(self.args)): | |
arg = self.args[i] | |
if i < len(argument_array): | |
parameters[arg.name] = argument_array[i] | |
else: | |
parameters[arg.name] = None | |
# Now that we have simulated the parameters, go ahead and test them | |
self.validate_parameters("unnamed", parameters) | |
def execute(self, in_stream=sys.stdin, out_stream=sys.stdout): | |
""" | |
Get the arguments that were provided from the command-line and execute the script. | |
Arguments: | |
in_stream -- The stream to get the input from (defaults to standard input) | |
out_stream -- The stream to write the output to (defaults to standard output) | |
""" | |
try: | |
self.logger.debug("Modular input started (execute called)") | |
if len(sys.argv) > 1: | |
if sys.argv[1] == "--scheme": | |
self.do_scheme(out_stream) | |
elif sys.argv[1] == "--validate-arguments": | |
self.logger.debug("Validate arguments called: input verifying arguments") | |
# Exit with an code if validation failed | |
if self.do_validation() == False: | |
sys.exit(1) | |
else: | |
self.usage(out_stream) | |
else: | |
# Run the modular input | |
self.do_run(in_stream, log_exception_and_continue=True) | |
self.logger.debug("Execution completed successfully") | |
except Exception as exception: | |
self.logger.exception("Execution failed") | |
# Make sure to grab any exceptions so that we can print a valid error message | |
self.print_error(str(exception), out_stream) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment