Skip to content

Instantly share code, notes, and snippets.

@gingerwizard
Created April 3, 2019 10:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gingerwizard/bf434ba6e0066e9743537e4d1d934fd2 to your computer and use it in GitHub Desktop.
Save gingerwizard/bf434ba6e0066e9743537e4d1d934fd2 to your computer and use it in GitHub Desktop.
import argparse
import json
import yaml
import os
import sys
parser = argparse.ArgumentParser()
#es details
parser.add_argument('--ecs_migration_file', dest='ecs_migration_file', required=False, default='./ecs-migration.yml')
parser.add_argument('--output_folder', dest='output_folder', required=False, default='./pipelines')
parser.add_argument('--beat', dest='beat', required=True)
args = parser.parse_args()
#processors applied to all documents
processors = []
#processors applied to specific beat docs
beat_processors = {}
#fields removed from all docs
remove_fields=[]
#fields to be removed from specific beat docs
beat_remove_fields={}
if os.path.isfile(args.ecs_migration_file):
with open(args.ecs_migration_file,'r') as ecs_migration_file:
migrations = yaml.safe_load(ecs_migration_file.read())
for migration in migrations:
if not 'beat' in migration:
#top level pipeline
processors.append({ 'set' : { 'field': migration['to'], 'value': '{{%s}}' % migration['from'] , 'if': 'ctx?.%s' % migration['from'].replace('.','?.') } })
remove_fields.append(migration['from'])
else:
if not migration['beat'] in beat_processors:
beat_processors[migration['beat']] = []
beat_remove_fields[migration['beat']] = []
beat_processors[migration['beat']].append({ 'set' : { 'field': migration['to'], 'value': '{{%s}}' % migration['from'] , 'if': 'ctx?.%s' % migration['from'].replace('.','?.') } })
beat_remove_fields[migration['beat']].append(migration['from'])
else:
print('%s migration file not found' % args.ecs_migration_file)
sys.exit(1)
processors.append({'remove':{'field':remove_fields,'ignore_missing':True}})
for beat, fields in beat_remove_fields.items():
beat_processors[beat].append({'remove':{'field':fields,'ignore_missing':True}})
if not os.path.exists(args.output_folder):
os.makedirs(args.output_folder)
#produce our specific beat pipelines and add a reference to the main beats pipeline
for beat, b_processors in beat_processors.items():
with open('%s/%s.json' % (args.output_folder, beat), 'w') as beat_pipeline:
json.dump({ 'description' : 'ECS Migration Pipeline for %s' % beat, 'processors' : b_processors }, beat_pipeline , indent=2)
processors.append({ 'pipeline': { 'if': '_index.startsWith(%s)' % beat, 'name': '%s' % beat } })
#now our main pipeline
with open('%s/main_ecs_migrator.json' % args.output_folder, 'w') as main_pipeline:
json.dump({ 'description' : 'Base ECS Migration Pipeline', 'processors' : processors }, main_pipeline, indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment