Skip to content

Instantly share code, notes, and snippets.

@valeriocos
Created June 11, 2020 14:51
Show Gist options
  • Save valeriocos/cad3e1c2962559926dfd45bad11dedd2 to your computer and use it in GitHub Desktop.
Save valeriocos/cad3e1c2962559926dfd45bad11dedd2 to your computer and use it in GitHub Desktop.
Create the .grimoirelab-sigils index and migrate the .kibana from ES 6.1 to ES 6.8
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2020 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Valerio Cosentino <valcos@bitergia.com>
#
import json
import requests
# TARGET ES
ENV = 'localhost:9200'
# CREDENTIALS
USER = 'xxx'
PWD = 'xxx'
# IF TRUE, THE .GRIMOIRELAB-SIGILS WILL BE UPLOADED TO THE TARGET ES
UPLOAD = True
# DO NOT MODIFY AFTER HERE
ELASTICSEARCH_URL = "https://{}:{}@{}"
KIBANA_INDEX = ".kibana"
SIGILS_INDEX = ".grimoirelab-sigils"
HEADERS = {"Content-Type": "application/json"}
def get_page_items(url, scroll_id=None):
"""Get the documents from the input url at a given page/scroll"""
search_url = url + "/_search"
if not scroll_id:
params = {
'size': 100,
'scroll': '10m'
}
query = {
"query": {
"bool": {
"should": [
{
"term": {
"type": "index-pattern"
}
},
{
"term": {
"type": "dashboard"
}
}
]
}
}
}
res = requests.post(search_url, params=params, data=json.dumps(query), headers=HEADERS, verify=False)
else:
search_url += "/scroll"
query = {
'scroll': '10m',
'scroll_id': scroll_id
}
res = requests.post(search_url, data=json.dumps(query), headers=HEADERS, verify=False)
res.raise_for_status()
rjson = res.json()
return rjson
def fetch(es_url, index):
"""Fetch the documents from a target index and returns a generator"""
scroll_id = None
index_url = es_url + '/' + index
page = get_page_items(index_url, scroll_id)
if not page:
print("No results found")
return []
scroll_id = page["_scroll_id"]
total = page['hits']['total']
scroll_size = total['value'] if isinstance(total, dict) else total
if scroll_size == 0:
print("No results found")
return
while scroll_size > 0:
for item in page['hits']['hits']:
yield item
page = get_page_items(es_url, scroll_id)
if not page:
break
scroll_size = len(page['hits']['hits'])
print("Fetch completed")
def prepare_sigils_items():
es_url = ELASTICSEARCH_URL.format(USER, PWD, ENV)
items_to_upload = []
for obj in fetch(es_url, KIBANA_INDEX):
source = obj['_source']
# only dashboard and index-pattern objects can contain the release_date attr
if source['type'] not in ['dashboard', 'index-pattern']:
continue
item_uuid = obj['_id']
item_id = item_uuid.split(':')[1] if ':' in item_uuid else item_uuid
item_type = source['type']
release_date = source[item_type].get('release_date', None)
item_title = source[item_type].get('title', '')
if not release_date:
print("Item %s (title: %s) doesn't contain release_date, it won't be inserted" % (item_uuid, item_title))
continue
item_json = {
"item_uuid": item_uuid,
"item_id": item_id,
"item_type": item_type,
"release_date": release_date
}
items_to_upload.append(item_json)
return items_to_upload
def upload_sigils_index(items):
es = ELASTICSEARCH_URL.format(USER, PWD, ENV)
for item in items:
item_uuid = item.pop('item_uuid')
sigils_index_url = es + '/' + SIGILS_INDEX + '/doc/' + item_uuid
res = requests.post(sigils_index_url, data=json.dumps(item), verify=False, headers=HEADERS)
res.raise_for_status()
print("Item %s added" % item_uuid)
def main():
"""The script allows to:
- identify the dashboards and index patterns in the .kibana not including a release date
- generate the .grimoirelab-sigils index containing the ID of index patterns/dashboards together with their release date.
- for the index patterns/dashboards the current time is set as release date
To execute the script you need to set ENV, USER, PWD and enable UPLOAD (the latter will create the .grimoirelab-sigils index)
:return:
"""
items = prepare_sigils_items()
if UPLOAD:
upload_sigils_index(items)
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2020 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Valerio Cosentino <valcos@bitergia.com>
#
import json
import os
import subprocess
import re
# TARGET ES
ENV = 'localhost:9200'
# CREDENTIALS
USER = 'xxx'
PWD = 'xxx'
# PATH WHERE TO STORE AND MANIPULATE THE .KIBANA DATA
FOLDER_PATH = '/home/kibana-migrations/'
# DO NOT MODIFY AFTER HERE
DUMP_MAPPING_TEMPLATE = '''
NODE_TLS_REJECT_UNAUTHORIZED=0 elasticdump \
--input=https://{}:{}@{} \
--input-index=.kibana \
--output={} \
--limit=1000 \
--type=mapping
'''
DUMP_DATA_TEMPLATE = '''
NODE_TLS_REJECT_UNAUTHORIZED=0 elasticdump \
--input=https://{}:{}@{} \
--input-index=.kibana \
--output={} \
--limit=1000 \
--type=data
'''
UPLOAD_MAPPING_TEMPLATE = '''
NODE_TLS_REJECT_UNAUTHORIZED=0 elasticdump \
--input={} \
--output=https://{}:{}@{} \
--output-index=.kibana_old \
--limit=1000 \
--type=mapping
'''
UPLOAD_DATA_TEMPLATE = '''
NODE_TLS_REJECT_UNAUTHORIZED=0 elasticdump \
--input={} \
--output=https://{}:{}@{} \
--output-index=.kibana_old \
--limit=1000 \
--type=data
'''
DELETE_INDEX = '''
curl -XDELETE https://{}:{}@{}/.kibana -k
'''
SET_ALIAS_TEMPLATE = '''
curl -XPOST https://{}:{}@{}/_aliases -d '{{"actions":[{{"add":{{"index":".kibana_old","alias":".kibana"}}}}]}}' -H 'Content-Type: application/json' -k
'''
SHOW_ALIASES = '''
curl -XGET https://{}:{}@{}/_aliases?pretty -k
'''
def call(cmd):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in p.stdout.readlines():
print(line),
retval = p.wait()
def dump(mapping_file_path, data_file_path):
cmd_dump_mapping = DUMP_MAPPING_TEMPLATE.format(USER, PWD, ENV, mapping_file_path)
call(cmd_dump_mapping)
cmd_dump_data = DUMP_DATA_TEMPLATE.format(USER, PWD, ENV, data_file_path)
call(cmd_dump_data)
def remove_release_date(file_path, reg_exp, fallback_reg_exp=None):
lines = []
new_file_path = file_path.replace('.json', '_processed.json')
with open(file_path, 'r') as f:
content = f.readlines()
for line in content:
line_proc = line
if '\"release_date' in line:
line_proc = re.sub(reg_exp, '', line_proc)
if '\"release_date' in line_proc and fallback_reg_exp:
line_proc = re.sub(fallback_reg_exp, '{', line_proc)
if '\"release_date' in line_proc:
raise Exception
if 'mapping' in file_path:
line_proc = line_proc.replace('{"doc":{"properties"', '{"doc":{"dynamic":"strict","properties"')
json.loads(line_proc)
lines.append(line_proc)
with open(new_file_path, 'w') as f:
for line in lines:
f.write(line)
return new_file_path
def main():
"""The script does the following:
- 1) create a local directory in your machine at FOLDER_PATH
- 2) dump in the directory the mappings and data from a target Kibana instance
- 3) process the files downloaded to remove the release_date and set the mappings to strict
- 4) print a set of commands to:
- a) delete the .kibana index
- b) upload the new mappings and data to the index .kibana_old in the target Kibana instance
- c) set the alias .kibana to the index .kibana_old
- d) check that the alias is set correctly
The workflow is the following:
- run the script
- check that the mappings and data have been correctly downloaded
- delete the current .kibana with 4a
- upload the .kibana_old generated with 4b
- set the alias .kibana to .kibana_old with 4c
- check that the alias is set correctly with 4d
- switch off the Kibana
- upgrade to Kibana 6.8
:return:
"""
if os.path.isdir(FOLDER_PATH):
print("directory exists")
return
os.mkdir(FOLDER_PATH)
mapping_file_path = FOLDER_PATH + ENV + '_mapping.json'
data_file_path = FOLDER_PATH + ENV + '.json'
dump(mapping_file_path, data_file_path)
new_mapping_file_path = remove_release_date(mapping_file_path, ',\"release_date\":\{\"type\".\"date\"\}')
new_data_file_path = remove_release_date(data_file_path, ',\"release_date\":\".*\.[0-9]*\"',
fallback_reg_exp='\{\"release_date\":\".*\.[0-9]*\",')
print(DELETE_INDEX.format(USER, PWD, ENV))
cmd_upload_mapping = UPLOAD_MAPPING_TEMPLATE.format(new_mapping_file_path, USER, PWD, ENV)
print(cmd_upload_mapping)
cmd_dump_data = UPLOAD_DATA_TEMPLATE.format(new_data_file_path, USER, PWD, ENV)
print(cmd_dump_data)
print(SET_ALIAS_TEMPLATE.format(USER, PWD, ENV))
print(SHOW_ALIASES.format(USER, PWD, ENV))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment