Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save cutler-scott-newrelic/a7fcabfe42dab81144c10c445fceff65 to your computer and use it in GitHub Desktop.
Save cutler-scott-newrelic/a7fcabfe42dab81144c10c445fceff65 to your computer and use it in GitHub Desktop.
This is a quick and dirty Python script that takes the legacy-style mappings and settings from filebeat and inserts them into an existing component template. This is WAY MORE work than it should be.
import pprint
import sys
import requests
import json
import argparse
import urllib
import logging
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
parser.add_argument('url', help='The URL for the required Elastic search DB (default port 9200)')
parser.add_argument('--username', help='The username for authenticating to Elastic')
parser.add_argument('--password', help='The password for authenticating to Elastic')
parser.add_argument('--verbose', '-v', action='count', default=0)
args = parser.parse_args()
URL_OBJECT = urllib.parse.urlsplit(args.url, 'http')
if not URL_OBJECT.port:
URL_OBJECT = URL_OBJECT._replace(netloc="{}:{}".format(URL_OBJECT.hostname, 9200))
ELASTIC_USERNAME = args.username
ELASTIC_PASSWORD = args.password
AUTH = (ELASTIC_USERNAME, ELASTIC_PASSWORD)
ELASTIC_HEADERS = {
'Content-Type': 'application/json'
}
LOG_LEVEL = logging.WARNING
if args.verbose == 1:
LOG_LEVEL = logging.INFO
elif args.verbose >= 2:
LOG_LEVEL = logging.DEBUG
def initialize_logger(log_level):
logger = logging.getLogger(__name__)
logging_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
log_handler = logging.StreamHandler(sys.stdout)
logger.setLevel(log_level)
log_handler.setFormatter(logging_format)
logger.addHandler(log_handler)
return logger
LOGGER = initialize_logger(LOG_LEVEL)
def get_filebeat_legacy_templates():
LOGGER.info('Attempting to fetch all legacy filebeat templates from Elastic...')
return elastic_http('get', '/_template/filebeat-*')
def get_latest_legacy_template():
"""Grabs all the legacy templates that start with filebeat- and returns the latest by version number"""
templates = get_filebeat_legacy_templates()
if len(templates) < 1:
raise Exception("No templates found?")
if len(templates) == 1:
return templates.popitem()
else:
versions = list(map(lambda x: x.split('-')[-1], templates.keys()))
versions.sort(key=lambda s: [int(u) for u in s.split('.')])
key = f"filebeat-{versions[-1]}"
return key, templates[key]
def modify_component_template(template_tuple):
"""This assumes you have already configured a component template for the index and data you are working with"""
key, value = template_tuple
LOGGER.info("Modifying filebeat template settings...")
path = '_component_template/filebeat-settings'
existing_settings = elastic_http('get', path)
existing_settings = existing_settings['component_templates']
existing_settings = list(filter(lambda x: 'name' in x and x['name'] == path.split('/')[-1], existing_settings))[0]['component_template']
existing_settings['template']['settings']['index']['refresh_interval'] = value['settings']['index']['refresh_interval']
existing_settings['template']['settings']['index']['mapping']['total_fields']['limit'] = value['settings']['index']['mapping']['total_fields']['limit']
existing_settings['template']['settings']['index']['max_docvalue_fields_search'] = value['settings']['index']['max_docvalue_fields_search']
existing_settings['template']['settings']['index']['query.default_field'] = value['settings']['index']['query']['default_field']
elastic_http('put', path, existing_settings)
LOGGER.info("Replacing filebeat mapping settings...")
path = '_component_template/filebeat-mappings'
json_obj = {
'template': {
'settings': {"index.mapping.total_fields.limit": "10000"},
'mappings': value['mappings']
}
}
LOGGER.debug(pprint.pformat(json_obj))
elastic_http('put', path, json_obj)
def elastic_http(method, path, json_obj=None):
LOGGER.info(f"Running {method} on {path} ...")
url_object = URL_OBJECT._replace(path=path)
if json_obj:
r = getattr(sys.modules['requests'], method)(
urllib.parse.urlunsplit(url_object),
auth=AUTH,
headers=ELASTIC_HEADERS,
verify=False,
json=json_obj
)
else:
r = getattr(sys.modules['requests'], method)(
urllib.parse.urlunsplit(url_object),
auth=AUTH,
headers=ELASTIC_HEADERS,
verify=False
)
if r.ok:
LOGGER.info(f"{method} returned 200")
return r.json()
else:
LOGGER.warning('Something went wrong, ingest pipeline not updated!')
LOGGER.debug('Status Code:', r.status_code)
LOGGER.debug('Message:'+json.dumps(r.json(), indent=2))
raise Exception
def init():
latest_template = get_latest_legacy_template()
modify_component_template(latest_template)
init()
LOGGER.warning('Update pipeline script finished.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment