Created
July 27, 2021 19:20
-
-
Save cutler-scott-newrelic/a7fcabfe42dab81144c10c445fceff65 to your computer and use it in GitHub Desktop.
This is a quick and dirty Python script that takes the legacy-style mappings and settings from filebeat and inserts them into an existing component template. This is WAY MORE work than it should be.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pprint | |
import sys | |
import requests | |
import json | |
import argparse | |
import urllib | |
import logging | |
parser = argparse.ArgumentParser() | |
group = parser.add_mutually_exclusive_group() | |
parser.add_argument('url', help='The URL for the required Elastic search DB (default port 9200)') | |
parser.add_argument('--username', help='The username for authenticating to Elastic') | |
parser.add_argument('--password', help='The password for authenticating to Elastic') | |
parser.add_argument('--verbose', '-v', action='count', default=0) | |
args = parser.parse_args() | |
URL_OBJECT = urllib.parse.urlsplit(args.url, 'http') | |
if not URL_OBJECT.port: | |
URL_OBJECT = URL_OBJECT._replace(netloc="{}:{}".format(URL_OBJECT.hostname, 9200)) | |
ELASTIC_USERNAME = args.username | |
ELASTIC_PASSWORD = args.password | |
AUTH = (ELASTIC_USERNAME, ELASTIC_PASSWORD) | |
ELASTIC_HEADERS = { | |
'Content-Type': 'application/json' | |
} | |
LOG_LEVEL = logging.WARNING | |
if args.verbose == 1: | |
LOG_LEVEL = logging.INFO | |
elif args.verbose >= 2: | |
LOG_LEVEL = logging.DEBUG | |
def initialize_logger(log_level): | |
logger = logging.getLogger(__name__) | |
logging_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s') | |
log_handler = logging.StreamHandler(sys.stdout) | |
logger.setLevel(log_level) | |
log_handler.setFormatter(logging_format) | |
logger.addHandler(log_handler) | |
return logger | |
LOGGER = initialize_logger(LOG_LEVEL) | |
def get_filebeat_legacy_templates(): | |
LOGGER.info('Attempting to fetch all legacy filebeat templates from Elastic...') | |
return elastic_http('get', '/_template/filebeat-*') | |
def get_latest_legacy_template(): | |
"""Grabs all the legacy templates that start with filebeat- and returns the latest by version number""" | |
templates = get_filebeat_legacy_templates() | |
if len(templates) < 1: | |
raise Exception("No templates found?") | |
if len(templates) == 1: | |
return templates.popitem() | |
else: | |
versions = list(map(lambda x: x.split('-')[-1], templates.keys())) | |
versions.sort(key=lambda s: [int(u) for u in s.split('.')]) | |
key = f"filebeat-{versions[-1]}" | |
return key, templates[key] | |
def modify_component_template(template_tuple): | |
"""This assumes you have already configured a component template for the index and data you are working with""" | |
key, value = template_tuple | |
LOGGER.info("Modifying filebeat template settings...") | |
path = '_component_template/filebeat-settings' | |
existing_settings = elastic_http('get', path) | |
existing_settings = existing_settings['component_templates'] | |
existing_settings = list(filter(lambda x: 'name' in x and x['name'] == path.split('/')[-1], existing_settings))[0]['component_template'] | |
existing_settings['template']['settings']['index']['refresh_interval'] = value['settings']['index']['refresh_interval'] | |
existing_settings['template']['settings']['index']['mapping']['total_fields']['limit'] = value['settings']['index']['mapping']['total_fields']['limit'] | |
existing_settings['template']['settings']['index']['max_docvalue_fields_search'] = value['settings']['index']['max_docvalue_fields_search'] | |
existing_settings['template']['settings']['index']['query.default_field'] = value['settings']['index']['query']['default_field'] | |
elastic_http('put', path, existing_settings) | |
LOGGER.info("Replacing filebeat mapping settings...") | |
path = '_component_template/filebeat-mappings' | |
json_obj = { | |
'template': { | |
'settings': {"index.mapping.total_fields.limit": "10000"}, | |
'mappings': value['mappings'] | |
} | |
} | |
LOGGER.debug(pprint.pformat(json_obj)) | |
elastic_http('put', path, json_obj) | |
def elastic_http(method, path, json_obj=None): | |
LOGGER.info(f"Running {method} on {path} ...") | |
url_object = URL_OBJECT._replace(path=path) | |
if json_obj: | |
r = getattr(sys.modules['requests'], method)( | |
urllib.parse.urlunsplit(url_object), | |
auth=AUTH, | |
headers=ELASTIC_HEADERS, | |
verify=False, | |
json=json_obj | |
) | |
else: | |
r = getattr(sys.modules['requests'], method)( | |
urllib.parse.urlunsplit(url_object), | |
auth=AUTH, | |
headers=ELASTIC_HEADERS, | |
verify=False | |
) | |
if r.ok: | |
LOGGER.info(f"{method} returned 200") | |
return r.json() | |
else: | |
LOGGER.warning('Something went wrong, ingest pipeline not updated!') | |
LOGGER.debug('Status Code:', r.status_code) | |
LOGGER.debug('Message:'+json.dumps(r.json(), indent=2)) | |
raise Exception | |
def init(): | |
latest_template = get_latest_legacy_template() | |
modify_component_template(latest_template) | |
init() | |
LOGGER.warning('Update pipeline script finished.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment