Skip to content

Instantly share code, notes, and snippets.

@mnadel
Last active February 12, 2020 18:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mnadel/cc1524e36db6b03cf7d5b744d27a579f to your computer and use it in GitHub Desktop.
Save mnadel/cc1524e36db6b03cf7d5b744d27a579f to your computer and use it in GitHub Desktop.
Parse a Prometheus rules (v1.x) file
#!/usr/bin/env python
import re
import sys
import csv
class State:
def __init__(self, val):
self._val = val
self._remaining = []
def add_remaining(self, text):
if text not in ["{", "}"]:
self._remaining.append(text)
@property
def render(self):
return True
@property
def name(self):
return self.__class__.__name__
def __str__(self):
return self._val
class Ignore(State):
def __init__(self):
State.__init__(self, "~ignore~")
@property
def render(self):
return False
class Name(State):
def __init__(self, val):
State.__init__(self, val)
class Cond(State):
def __init__(self, val):
State.__init__(self, val)
def __str__(self):
vals = self._remaining
vals.insert(0, self._val)
return " ".join(vals)
class For(Ignore):
def __init__(self, val):
State.__init__(self, val)
class Labels(State):
def __init__(self, val):
State.__init__(self, val)
@property
def render(self):
return "severity" in self.__str__()
def __str__(self):
return " ".join(self._remaining)
class Annotations(Ignore):
def __init__(self, val):
State.__init__(self, val)
class Alert:
def __init__(self):
self.__dict__["_prev_states"] = []
self.__dict__["_state"] = Ignore()
def __setattr__(self, keyword, val):
if keyword == "name":
self._new_state(Name(val))
elif keyword == "dur":
self._new_state(For(val))
elif keyword == "labels":
self._new_state(Labels(val))
elif keyword == "annotations":
self._new_state(Annotations(val))
elif keyword == "cond":
self._new_state(Cond(val))
else:
raise ValueError("unknown keyword: {}".format(keyword))
def _new_state(self, new_state):
self._prev_states.append(self._state)
self.__dict__["_state"] = new_state
def remaining(self, text):
self._state.add_remaining(text)
def complete(self):
self._new_state(Ignore())
states = filter(lambda s: s.render, self._prev_states)
csv.writer(sys.stdout).writerow([str(x) for x in states])
def __str__(self):
return "Alert(state={}, prev={})".format(self._state, self._prev_states)
alert = None
for l in sys.stdin.readlines():
line = l.strip()
if line == "" or line.startswith("#"):
continue
splitted = re.split("\s+", line)
keyword = splitted[0].strip()
rest = " ".join(splitted[1:])
if keyword == "ALERT":
if alert:
alert.complete()
alert = Alert()
alert.name = rest
elif keyword == "IF":
alert.cond = rest
elif keyword == "FOR":
alert.dur = rest
elif keyword == "LABELS":
alert.labels = rest
elif keyword == "ANNOTATIONS":
alert.annotations = rest
else:
alert.remaining(line)
alert.complete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment