Skip to content

Instantly share code, notes, and snippets.

@chrisberkhout
Last active October 16, 2024 07:44
Show Gist options
  • Save chrisberkhout/5084336ece24a54007826bb92f501f9e to your computer and use it in GitHub Desktop.
Save chrisberkhout/5084336ece24a54007826bb92f501f9e to your computer and use it in GitHub Desktop.
Add tags to each processor in the given pipeline files
#!/usr/bin/env python3
import sys
from ruamel.yaml import YAML # pip install ruamel.yaml
import re
def normalize_name(parts, tag_list):
good_parts = [part for part in parts if part is not None]
print(f"good_parts = {good_parts}")
joined = "_".join(good_parts).lower()
alphanum = re.sub(r'[^a-zA-Z0-9_]', '_', joined)
nodouble = re.sub(r'__+', '_', alphanum)
notrailing = re.sub(r'_+$', '', nodouble)
candidate = notrailing
initial_candidate = candidate
suf = 1
while candidate in tag_list:
suf += 1
candidate = f"{initial_candidate}_{suf}"
tag_list.append(candidate)
return candidate
def tag_processors(processors, tag_list):
for processor in processors:
name = list(processor.keys())[0]
parts = [name]
if "description" in processor[name]:
parts.append(processor[name]["description"])
elif name == "pipeline":
processor[name]["name"]
quote_char = '"' if '"' in processor[name]["name"] else "'"
item = processor[name]["name"].split(quote_char)[1]
parts += item
else:
item = processor[name].get("field")
parts += item if isinstance(item, list) else [item]
tag = normalize_name(parts, tag_list)
processor[name].insert(0, "tag", tag)
if name == "foreach":
tag_processors([processor[name]["processor"]], tag_list)
def tag(filename):
yaml = YAML()
yaml.preserve_quotes = True
yaml.explicit_start = True
yaml.indent(sequence=4, offset=2)
yaml.width = 4096
with open(filename, 'r', encoding='utf-8') as f:
old_content = f.read()
parsed = yaml.load(old_content)
tag_list = []
tag_processors(parsed["processors"], tag_list)
if "on_failure" in parsed:
tag_processors(parsed["on_failure"], tag_list)
with open(filename, 'w', encoding='utf-8') as f:
yaml.dump(parsed, f)
if len(sys.argv) < 2:
print("usage:")
print()
print("find -wholename '*/ingest_pipeline/*.yml' | xargs tag-processors.py")
print("find -wholename '*/ingest_pipeline/*.yml' | xargs -n1 yq --prettyPrint --inplace # optional")
print("elastic-package format")
print("elastic-package build # to check it still works")
else:
for file in sys.argv[1:]:
print("tagging processors in %s" % file)
tag(file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment