Skip to content

Instantly share code, notes, and snippets.

@btknight
Created January 23, 2023 20:03
Show Gist options
  • Save btknight/a208f31840fedf39b264e3cd498ae37d to your computer and use it in GitHub Desktop.
Save btknight/a208f31840fedf39b264e3cd498ae37d to your computer and use it in GitHub Desktop.
"""Objects that facilitate parsing values from regex and assigning
those values to other objects.
Classes
Val:
Object that stores the group matches from an re.match() call.
Class methods that change the stored value are chainable.
Functions
match(pattern, text)
Directly returns the result of re.match. Text can be a str or
IOSCfgLine (from ciscoconfparse module).
val_from_match(pattern, text)
Returns a Val object with the groups matched in an re.match call.
"""
from __future__ import annotations
from ciscoconfparse import IOSCfgLine
from operator import methodcaller
from functools import wraps, partial
from collections import UserDict
import logging
from ipaddress import *
import re
from typing import Any, Callable, Tuple, Union, Match, AnyStr, Type
class TheVal(object):
"""A dummy object used for the check_for_neg_match decorator.
"""
pass
class NegativeMatch(object):
"""A dummy object used if there is no match.
"""
pass
class Val(object):
"""Object that stores the group matches from an re.match() call.
Returned by val_from_match().
Class methods that change the stored value are chainable, improving
readability.
If the re.match did not match anything, a Val() object is created
with the value set to NegativeMatch. Methods that change the object
henceforth do nothing, and methods that assign values to other
objects are not called.
Attributes:
val: any object
The value carried by the object.
"""
def __init__(self, val: Any):
self._frozen = False
self._val = val
self._frozen = True
@property
def val(self) -> Any:
return self._val
@val.setter
def val(self, val: Any) -> None:
if self._frozen:
raise AttributeError("Object is frozen")
self._val = val
@property
def is_match(self) -> bool:
return self.val is not NegativeMatch
@staticmethod
def check_for_neg_match(f, neg_obj: Union[TheVal, None]) -> Callable[[Any], Callable[[Any], Any]]:
"""Returns a decorator wrapper that, before running an instance
method, checks the running object if the value is
NegativeMatch. If it is, do not run the method; instead return
neg_obj.
Parameters
f: function
function to run
neg_obj: object
object to return if self.val is a NegativeMatch object.
If neg_obj is TheVal, return self.
(Ugh, compile time vs. runtime.)
Returns: decorator callback
"""
@wraps(f)
def wrapper(self, *args, **kwargs):
if self.is_match:
return f(self, *args, **kwargs)
else:
if neg_obj is TheVal:
return self
else:
return neg_obj
return wrapper
value_manipulator = partial(check_for_neg_match, neg_obj=TheVal)
value_assigner = partial(check_for_neg_match, neg_obj=None)
def copy(self) -> Val:
return Val(self.val)
@value_manipulator
def groups(self, *groups) -> Val:
"""If the stored value is a tuple originally from the parser
function, return a new Val() with the desired match groups.
Parameters:
groups: tuple | int
Desired match groups.
Returns: Val()
"""
#logging.debug(f"groups({groups}) {self.val}")
new_val = None
if not isinstance(self.val, tuple):
raise ValueError("groups() called, but stored value is not a tuple")
if len(groups) > 1:
new_val = self.transform(lambda x: tuple([x[i] for i in groups]))
elif len(groups) == 1:
new_val = self.transform(lambda x: x[groups[0]])
elif len(groups) == 0:
new_val = self.set(None)
#logging.debug(f"groups({groups}) {self.val} -> {new_val}")
return new_val
@value_manipulator
def tag_parse(self) -> Val:
"""Parses Cisco-style VLAN tags into a set.
"""
#logging.debug(f"tag_parse() {self.val}")
tag_s = set()
first_term = None
if isinstance(self.val, tuple):
# Cisco does not support multiple tag assignment in the
# first term of "encap dot1Q tag1 second-dot1Q tag2".
# Only tag2 may have a VLAN range. So we parse only the
# second tag set.
if self.val[1] is not None:
first_term = self.val[0]
terms = self.val[1]
else:
terms = self.val[0]
else:
terms = self.val
for term in terms.split(','):
if '-' in term:
start = int(term.split('-')[0])
end = int(term.split('-')[1])
tag_s |= {str(i) for i in range(start, end+1)}
else:
tag_s.add(term)
if first_term is not None:
tag_s = {(first_term, i) for i in tag_s}
new_val = Val(tag_s)
#logging.debug(f"tag_parse() {self.val} -> {new_val}")
return new_val
@value_manipulator
def set(self, val) -> Val:
"""Returns a new Val() object with a specified value.
Parameters
val: object
New value
Returns: Val()
"""
return Val(val)
@value_manipulator
def transform(self, transform: Callable[[Any], Any]) -> Val:
"""Performs a transformation on the value supplied, and returns
a new Val() object with the transformed value.
Parameters
transform: lambda
Expression used for the transform
Returns: Val()
"""
#logging.debug(f"transform({transform}) {self.val}")
new_val = Val(transform(self.val))
#logging.debug(f"transform({transform}) {self.val} -> {new_val}")
return new_val
def _clean_ip(self) -> str:
"""Internal method to check value for the CIDR '/'. Used for
Cisco IOS-style "1.2.3.4 255.255.255.0" type of strings.
"""
if '/' not in self.val:
ip_str = self.val.replace(" ","/",1).replace(" ","")
else:
ip_str = self.val
return ip_str
def _is_ipobj(self, ip_object: Callable[[str], Any], ip: str) -> Val:
"""Returns a Val object converted to an ipaddress.* object, then converted back to a string.
If the ipaddress library returns an AddressValueError, a NegativeMatch is returned.
Used to verify that a value is truly an IP.
ip_object: Type
Type of ipaddress.* object"""
try:
return Val(str(ip_object(ip)))
except AddressValueError:
return Val(NegativeMatch)
@value_manipulator
def ip_address(self) -> Val:
"""Verifies that the value is an IPv[46]Address object.
"""
return self._is_ipobj(ip_address, self.val)
@value_manipulator
def IPv4Address(self) -> Val:
"""Verifies that the value is an IPv4Address object.
"""
return self._is_ipobj(IPv4Address, self.val)
@value_manipulator
def IPv6Address(self) -> Val:
"""Verifies that the value is an IPv6Address object.
"""
return self._is_ipobj(IPv6Address, self.val)
@value_manipulator
def ip_network(self) -> Val:
"""Verifies that the value is an IPv[46]Network object.
"""
return self._is_ipobj(ip_network, self._clean_ip())
@value_manipulator
def IPv4Network(self) -> Val:
"""Verifies that the value is an IPv4Network object.
"""
return self._is_ipobj(IPv4Network, self._clean_ip())
@value_manipulator
def IPv6Network(self) -> Val:
"""Verifies that the value is an IPv6Network object.
"""
return self._is_ipobj(IPv6Network, self.val)
@value_manipulator
def ip_interface(self) -> Val:
"""Verifies that the value is an IPv[46]Interface object.
"""
return self._is_ipobj(ip_interface, self._clean_ip())
@value_manipulator
def IPv4Interface(self) -> Val:
"""Verifies that the value is an IPv4Network object.
"""
return self._is_ipobj(IPv4Interface, self._clean_ip())
@value_manipulator
def IPv6Interface(self) -> Val:
"""Verifies that the value is an IPv6Network object.
"""
return self._is_ipobj(IPv6Interface, self.val)
@value_assigner
def assign(self, obj, attr) -> None:
"""Sets an attribute on an object using the stored value.
Parameters
obj: data_model object
The target object whose attribute should be assigned
attr: str
The name of the attribute to set
Returns: result of setattr()
"""
#logging.debug(f"assign({obj}, {attr}) {self.val}")
self._set_attr_on_obj(obj, attr, self.val)
#logging.debug(f"assign({obj}, {attr}) {self.val} -> {retval}")
return None
@value_assigner
def call(self, obj: Any, attr: str, method: str, reassign: bool = False, pre_args: Tuple[Any] = tuple(),
post_args: Tuple[Any] = tuple(), kwargs: dict = {}) -> Any:
"""Generates a callback function that calls a method on a class
attribute.
Parameters
target_o: data_model object
The target object whose attribute should be assigned.
attr: str
The name of the attribute whose method should be called.
method: str
The name of the method to call.
reassign: bool
If set to True, will assign the result of the call back to
the attribute. Used for operations that do not modify the
original object.
pre_args: tuple
Tuple of arguments to supply to the function before the value
is passed.
post_args: tuple
Same as above, except the args are supplied after the value.
kwargs: dict
Keyword arguments to supply to the function.
Returns: method call result
"""
#logging.debug(f"call({obj}, {attr}, {method}, "
#f"pre_args={pre_args} ({type(pre_args)}), post_args={post_args}) {self.val}")
fnargs = pre_args if isinstance(pre_args, tuple) else (pre_args,)
fnargs += (self.val,)
fnargs += post_args if isinstance(post_args, tuple) else (post_args,)
mc = methodcaller(method, *fnargs, **kwargs)
target = self._get_attr_from_obj(obj, attr)
op_result = mc(target)
if reassign:
self._set_attr_on_obj(obj, attr, op_result)
#logging.debug(f"call({obj}, {attr}, {method}, pre_args={pre_args} "
#f"({type(pre_args)}), post_args={post_args}) {self.val} -> {retval}")
return op_result
def _obj_is_dict(self, obj):
"""True if obj is a dict or UserDict."""
return isinstance(obj, dict) or isinstance(obj, UserDict)
def _get_attr_from_obj(self, obj, attr):
"""Gets an attr from an obj. Uses getattr() for class-like objects and __getitem__ for dicts."""
if self._obj_is_dict(obj) and attr in obj:
retval = obj[attr]
else:
retval = getattr(obj, attr)
return retval
def _set_attr_on_obj(self, obj, attr, val):
"""Sets an attr on an obj. Uses setattr() for class-like objects and __getitem__ / assignment for dicts."""
if self._obj_is_dict(obj):
obj[attr] = val
else:
setattr(obj, attr, val)
@value_assigner
def run(self, fn: Callable[[Any], Any]) -> Any:
"""Calls a user-supplied function, supplying the value as the only
argument.
Parameters:
fn: function
Function to be called
Returns: return value of the called function
"""
#logging.debug(f"run({fn}) {self.val}")
retval = fn(self.val)
#logging.debug(f"run({fn}) {self.val} -> {retval}")
return retval
def if_(self, condition) -> ValIf:
"""Returns a ValIf() object with the supplied condition.
"""
#logging.debug(f"if_({condition}) {self.val}")
valif = ValIf(self.copy(), condition)
#logging.debug(f"if_({condition}) {self.val} -> {valif}")
return valif
def __repr__(self):
return f'<Val "{self.val}">'
def __str__(self):
return str(self.val)
class ValIf(object):
"""Supports decision-making on Val objects.
* If the supplied condition is met, a new Val() object with the
original value is returned.
* If the condition was not met, a new Val() object containing a
NegativeMatch() object is passed.
"""
def __init__(self, val_o: Val, condition: Callable[[Any], Any]):
"""Initializer.
Parameters
val: Val() object
condition: lambda
The condition to test the Val object's value.
"""
if not isinstance(val_o, Val):
raise ValueError("Got passed {type(val_o)} - ValIf does not accept anything but Val objects")
self.val_o = val_o
self.condition = condition
@staticmethod
def check_for_neg_match(f: Callable[[Any], Any]) -> Callable[[Any], Any]:
"""If the stored Val is a tuple originally from the parser
function, return a new Val() with the desired match groups.
Parameters:
groups: tuple | int
Desired match groups.
Returns: Val()
"""
@wraps(f)
def wrapper(self, *args, **kwargs):
if self.val_o.is_match:
return f(self, *args, **kwargs)
else:
return self.val_o
return wrapper
@property
def val(self) -> Val:
return self.val_o.val
@property
def is_match(self) -> bool:
return self.val_o.is_match
@check_for_neg_match
def decision(self, transform: Callable[[Any], Any] = lambda x: x) -> Val:
"""Supports the true() and false() methods. The bool result
of the stored condition is run through a transform lambda.
That result is applied to a new Val() object and returned.
Parameters
transform: lambda
A transform applied to the user-supplied condition.
Intended to be used by false(), where
transform=(lambda x: not x).
"""
new_val = self.val_o.val if transform(self.condition(self.val_o.val)) else NegativeMatch
return Val(new_val)
def true(self) -> Val:
"""Returns a Val() object with its matched_if flag set to True
if the user-supplied condition is True.
"""
return self.decision()
def false(self) -> Val:
"""Returns a Val() object with its matched_if flag set to True
if the user-supplied condition is False.
"""
return self.decision(lambda x: not x)
def __repr__(self) -> str:
return f"<ValIf {self.val}>"
def match(pattern: str, ccp_line: Union[IOSCfgLine, str]) -> Union[Match[AnyStr], None]:
"""Calls re.match on either an IOSCfgLine or a string.
Parameters
pattern: str
Regex pattern to match
ccp_line: str | IOSCfgLine
Text to match on
returns: result of re.match
"""
#logging.debug(f"match({ccp_line}, {pattern})")
text = None
if isinstance(ccp_line, IOSCfgLine):
text = ccp_line.text
elif isinstance(ccp_line, str):
text = ccp_line
else:
ValueError(f"Expecting IOSCfgLine or str, got {type(ccp_line)}")
retval = re.match(pattern, text)
#logging.debug(f"match({ccp_line}, {pattern}) -> {retval}")
return retval
def val_from_match(pattern: str, ccp_line: IOSCfgLine) -> Val:
"""Generates a Val() object based on the result of a regex match on
a line of text.
"""
#logging.debug(f"val_from_match({ccp_line}, {pattern})")
def clean(val):
"""Returns a strip()'ed result of group matching.
"""
if isinstance(val, str):
val = val.strip()
if val == '':
return None
return val
re_match = match(pattern, ccp_line)
retval = Val(NegativeMatch)
if re_match:
groups = (ccp_line,) + tuple([clean(i) for i in re_match.groups()])
retval = Val(groups)
#logging.debug(f"val_from_match({ccp_line}, {pattern}) -> {retval.val}")
return retval
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment