Skip to content

Instantly share code, notes, and snippets.

@artemrys
Last active September 6, 2022 13:20
Show Gist options
  • Save artemrys/e5da6d92205e4fffa65cfd4cf2e929ee to your computer and use it in GitHub Desktop.
Save artemrys/e5da6d92205e4fffa65cfd4cf2e929ee to your computer and use it in GitHub Desktop.
Splunk Add-on for Cisco Meraki custom rest handler with input validation
#
# SPDX-FileCopyrightText: 2021 Splunk, Inc. <sales@splunk.com>
# SPDX-License-Identifier: LicenseRef-Splunk-8-2021
#
#
import import_declare_test # noqa: F401 # isort: skip
import logging
from splunk_ta_cisco_meraki_organization_validation import organization_validation
from splunktaucclib.rest_handler import admin_external, util
from splunktaucclib.rest_handler.admin_external import AdminExternalHandler
from splunktaucclib.rest_handler.endpoint import (
RestModel,
SingleModel,
field,
validator,
)
util.remove_http_proxy_env_vars()
fields = [
field.RestField(
"region",
required=True,
encrypted=False,
default=None,
validator=validator.Enum(
values={"global", "china"},
),
),
field.RestField(
"organization_id",
required=True,
encrypted=False,
default=None,
validator=validator.AllOf(
validator.String(
max_len=50,
min_len=1,
),
validator.Pattern(
regex=r"""^\d+$""",
),
),
),
field.RestField(
"organization_api_key",
required=True,
encrypted=True,
default=None,
validator=validator.AllOf(
validator.String(
max_len=50,
min_len=1,
),
validator.Pattern(
regex=r"""^[a-z0-9]+$""",
),
),
),
]
model = RestModel(fields, name=None)
endpoint = SingleModel(
"splunk_ta_cisco_meraki_organization", model, config_name="organization"
)
class CiscoMerakiOrganizationExternalHandler(AdminExternalHandler):
def __init__(self, *args, **kwargs):
AdminExternalHandler.__init__(self, *args, **kwargs)
def handleList(self, confInfo):
AdminExternalHandler.handleList(self, confInfo)
def handleEdit(self, confInfo):
organization_validation(
self.payload.get("region"),
self.payload.get("organization_id"),
self.payload.get("organization_api_key"),
self.getSessionKey(),
)
AdminExternalHandler.handleEdit(self, confInfo)
def handleCreate(self, confInfo):
organization_validation(
self.payload.get("region"),
self.payload.get("organization_id"),
self.payload.get("organization_api_key"),
self.getSessionKey(),
)
AdminExternalHandler.handleCreate(self, confInfo)
def handleRemove(self, confInfo):
AdminExternalHandler.handleRemove(self, confInfo)
if __name__ == "__main__":
logging.getLogger().addHandler(logging.NullHandler())
admin_external.handle(
endpoint,
handler=CiscoMerakiOrganizationExternalHandler,
)
#
# SPDX-FileCopyrightText: 2021 Splunk, Inc. <sales@splunk.com>
# SPDX-License-Identifier: LicenseRef-Splunk-8-2021
#
#
"""
This module validates organization being saved by the user
"""
import import_declare_test # noqa: F401 # isort: skip
import traceback
import cisco_meraki_utils as utils
from splunktaucclib.rest_handler.error import RestError
def organization_validation(region, organization_id, organization_api_key, session_key):
"""
This method verifies the credentials by making an API call
"""
logger = utils.set_logger(
session_key, "splunk_ta_cisco_meraki_organization_validation"
)
logger.info(
"Verifying API key for the organization id {} ({} region)".format(
organization_id, region
)
)
if not organization_id or not organization_api_key:
raise RestError(
400,
"Provide all necessary arguments: "
"organization_id and organization_api_key.",
)
try:
proxy_settings = utils.get_proxy_settings(logger, session_key)
dashboard = utils.build_dashboard_api(
region, organization_api_key, proxy_settings
)
organizations = dashboard.organizations.getOrganizations()
valid_organization_id = False
for organization in organizations:
if str(organization["id"]) == str(organization_id):
valid_organization_id = True
break
if not valid_organization_id:
msg = "Failed to validate organization id: {} ({} region)".format(
organization_id, region
)
logger.error(msg)
raise RestError(400, msg)
except Exception:
logger.error(
"Failed to connect to Meraki for organization id: {} ({} region). {}".format(
organization_id, region, traceback.format_exc()
)
)
msg = (
"Could not connect to Meraki for organization id: {} ({} region). "
"Check configuration and network settings".format(organization_id, region)
)
raise RestError(400, msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment