Created
January 23, 2023 20:03
-
-
Save btknight/a208f31840fedf39b264e3cd498ae37d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Objects that facilitate parsing values from regex and assigning | |
those values to other objects. | |
Classes | |
Val: | |
Object that stores the group matches from an re.match() call. | |
Class methods that change the stored value are chainable. | |
Functions | |
match(pattern, text) | |
Directly returns the result of re.match. Text can be a str or | |
IOSCfgLine (from ciscoconfparse module). | |
val_from_match(pattern, text) | |
Returns a Val object with the groups matched in an re.match call. | |
""" | |
from __future__ import annotations | |
from ciscoconfparse import IOSCfgLine | |
from operator import methodcaller | |
from functools import wraps, partial | |
from collections import UserDict | |
import logging | |
from ipaddress import * | |
import re | |
from typing import Any, Callable, Tuple, Union, Match, AnyStr, Type | |
class TheVal(object): | |
"""A dummy object used for the check_for_neg_match decorator. | |
""" | |
pass | |
class NegativeMatch(object): | |
"""A dummy object used if there is no match. | |
""" | |
pass | |
class Val(object): | |
"""Object that stores the group matches from an re.match() call. | |
Returned by val_from_match(). | |
Class methods that change the stored value are chainable, improving | |
readability. | |
If the re.match did not match anything, a Val() object is created | |
with the value set to NegativeMatch. Methods that change the object | |
henceforth do nothing, and methods that assign values to other | |
objects are not called. | |
Attributes: | |
val: any object | |
The value carried by the object. | |
""" | |
def __init__(self, val: Any): | |
self._frozen = False | |
self._val = val | |
self._frozen = True | |
@property | |
def val(self) -> Any: | |
return self._val | |
@val.setter | |
def val(self, val: Any) -> None: | |
if self._frozen: | |
raise AttributeError("Object is frozen") | |
self._val = val | |
@property | |
def is_match(self) -> bool: | |
return self.val is not NegativeMatch | |
@staticmethod | |
def check_for_neg_match(f, neg_obj: Union[TheVal, None]) -> Callable[[Any], Callable[[Any], Any]]: | |
"""Returns a decorator wrapper that, before running an instance | |
method, checks the running object if the value is | |
NegativeMatch. If it is, do not run the method; instead return | |
neg_obj. | |
Parameters | |
f: function | |
function to run | |
neg_obj: object | |
object to return if self.val is a NegativeMatch object. | |
If neg_obj is TheVal, return self. | |
(Ugh, compile time vs. runtime.) | |
Returns: decorator callback | |
""" | |
@wraps(f) | |
def wrapper(self, *args, **kwargs): | |
if self.is_match: | |
return f(self, *args, **kwargs) | |
else: | |
if neg_obj is TheVal: | |
return self | |
else: | |
return neg_obj | |
return wrapper | |
value_manipulator = partial(check_for_neg_match, neg_obj=TheVal) | |
value_assigner = partial(check_for_neg_match, neg_obj=None) | |
def copy(self) -> Val: | |
return Val(self.val) | |
@value_manipulator | |
def groups(self, *groups) -> Val: | |
"""If the stored value is a tuple originally from the parser | |
function, return a new Val() with the desired match groups. | |
Parameters: | |
groups: tuple | int | |
Desired match groups. | |
Returns: Val() | |
""" | |
#logging.debug(f"groups({groups}) {self.val}") | |
new_val = None | |
if not isinstance(self.val, tuple): | |
raise ValueError("groups() called, but stored value is not a tuple") | |
if len(groups) > 1: | |
new_val = self.transform(lambda x: tuple([x[i] for i in groups])) | |
elif len(groups) == 1: | |
new_val = self.transform(lambda x: x[groups[0]]) | |
elif len(groups) == 0: | |
new_val = self.set(None) | |
#logging.debug(f"groups({groups}) {self.val} -> {new_val}") | |
return new_val | |
@value_manipulator | |
def tag_parse(self) -> Val: | |
"""Parses Cisco-style VLAN tags into a set. | |
""" | |
#logging.debug(f"tag_parse() {self.val}") | |
tag_s = set() | |
first_term = None | |
if isinstance(self.val, tuple): | |
# Cisco does not support multiple tag assignment in the | |
# first term of "encap dot1Q tag1 second-dot1Q tag2". | |
# Only tag2 may have a VLAN range. So we parse only the | |
# second tag set. | |
if self.val[1] is not None: | |
first_term = self.val[0] | |
terms = self.val[1] | |
else: | |
terms = self.val[0] | |
else: | |
terms = self.val | |
for term in terms.split(','): | |
if '-' in term: | |
start = int(term.split('-')[0]) | |
end = int(term.split('-')[1]) | |
tag_s |= {str(i) for i in range(start, end+1)} | |
else: | |
tag_s.add(term) | |
if first_term is not None: | |
tag_s = {(first_term, i) for i in tag_s} | |
new_val = Val(tag_s) | |
#logging.debug(f"tag_parse() {self.val} -> {new_val}") | |
return new_val | |
@value_manipulator | |
def set(self, val) -> Val: | |
"""Returns a new Val() object with a specified value. | |
Parameters | |
val: object | |
New value | |
Returns: Val() | |
""" | |
return Val(val) | |
@value_manipulator | |
def transform(self, transform: Callable[[Any], Any]) -> Val: | |
"""Performs a transformation on the value supplied, and returns | |
a new Val() object with the transformed value. | |
Parameters | |
transform: lambda | |
Expression used for the transform | |
Returns: Val() | |
""" | |
#logging.debug(f"transform({transform}) {self.val}") | |
new_val = Val(transform(self.val)) | |
#logging.debug(f"transform({transform}) {self.val} -> {new_val}") | |
return new_val | |
def _clean_ip(self) -> str: | |
"""Internal method to check value for the CIDR '/'. Used for | |
Cisco IOS-style "1.2.3.4 255.255.255.0" type of strings. | |
""" | |
if '/' not in self.val: | |
ip_str = self.val.replace(" ","/",1).replace(" ","") | |
else: | |
ip_str = self.val | |
return ip_str | |
def _is_ipobj(self, ip_object: Callable[[str], Any], ip: str) -> Val: | |
"""Returns a Val object converted to an ipaddress.* object, then converted back to a string. | |
If the ipaddress library returns an AddressValueError, a NegativeMatch is returned. | |
Used to verify that a value is truly an IP. | |
ip_object: Type | |
Type of ipaddress.* object""" | |
try: | |
return Val(str(ip_object(ip))) | |
except AddressValueError: | |
return Val(NegativeMatch) | |
@value_manipulator | |
def ip_address(self) -> Val: | |
"""Verifies that the value is an IPv[46]Address object. | |
""" | |
return self._is_ipobj(ip_address, self.val) | |
@value_manipulator | |
def IPv4Address(self) -> Val: | |
"""Verifies that the value is an IPv4Address object. | |
""" | |
return self._is_ipobj(IPv4Address, self.val) | |
@value_manipulator | |
def IPv6Address(self) -> Val: | |
"""Verifies that the value is an IPv6Address object. | |
""" | |
return self._is_ipobj(IPv6Address, self.val) | |
@value_manipulator | |
def ip_network(self) -> Val: | |
"""Verifies that the value is an IPv[46]Network object. | |
""" | |
return self._is_ipobj(ip_network, self._clean_ip()) | |
@value_manipulator | |
def IPv4Network(self) -> Val: | |
"""Verifies that the value is an IPv4Network object. | |
""" | |
return self._is_ipobj(IPv4Network, self._clean_ip()) | |
@value_manipulator | |
def IPv6Network(self) -> Val: | |
"""Verifies that the value is an IPv6Network object. | |
""" | |
return self._is_ipobj(IPv6Network, self.val) | |
@value_manipulator | |
def ip_interface(self) -> Val: | |
"""Verifies that the value is an IPv[46]Interface object. | |
""" | |
return self._is_ipobj(ip_interface, self._clean_ip()) | |
@value_manipulator | |
def IPv4Interface(self) -> Val: | |
"""Verifies that the value is an IPv4Network object. | |
""" | |
return self._is_ipobj(IPv4Interface, self._clean_ip()) | |
@value_manipulator | |
def IPv6Interface(self) -> Val: | |
"""Verifies that the value is an IPv6Network object. | |
""" | |
return self._is_ipobj(IPv6Interface, self.val) | |
@value_assigner | |
def assign(self, obj, attr) -> None: | |
"""Sets an attribute on an object using the stored value. | |
Parameters | |
obj: data_model object | |
The target object whose attribute should be assigned | |
attr: str | |
The name of the attribute to set | |
Returns: result of setattr() | |
""" | |
#logging.debug(f"assign({obj}, {attr}) {self.val}") | |
self._set_attr_on_obj(obj, attr, self.val) | |
#logging.debug(f"assign({obj}, {attr}) {self.val} -> {retval}") | |
return None | |
@value_assigner | |
def call(self, obj: Any, attr: str, method: str, reassign: bool = False, pre_args: Tuple[Any] = tuple(), | |
post_args: Tuple[Any] = tuple(), kwargs: dict = {}) -> Any: | |
"""Generates a callback function that calls a method on a class | |
attribute. | |
Parameters | |
target_o: data_model object | |
The target object whose attribute should be assigned. | |
attr: str | |
The name of the attribute whose method should be called. | |
method: str | |
The name of the method to call. | |
reassign: bool | |
If set to True, will assign the result of the call back to | |
the attribute. Used for operations that do not modify the | |
original object. | |
pre_args: tuple | |
Tuple of arguments to supply to the function before the value | |
is passed. | |
post_args: tuple | |
Same as above, except the args are supplied after the value. | |
kwargs: dict | |
Keyword arguments to supply to the function. | |
Returns: method call result | |
""" | |
#logging.debug(f"call({obj}, {attr}, {method}, " | |
#f"pre_args={pre_args} ({type(pre_args)}), post_args={post_args}) {self.val}") | |
fnargs = pre_args if isinstance(pre_args, tuple) else (pre_args,) | |
fnargs += (self.val,) | |
fnargs += post_args if isinstance(post_args, tuple) else (post_args,) | |
mc = methodcaller(method, *fnargs, **kwargs) | |
target = self._get_attr_from_obj(obj, attr) | |
op_result = mc(target) | |
if reassign: | |
self._set_attr_on_obj(obj, attr, op_result) | |
#logging.debug(f"call({obj}, {attr}, {method}, pre_args={pre_args} " | |
#f"({type(pre_args)}), post_args={post_args}) {self.val} -> {retval}") | |
return op_result | |
def _obj_is_dict(self, obj): | |
"""True if obj is a dict or UserDict.""" | |
return isinstance(obj, dict) or isinstance(obj, UserDict) | |
def _get_attr_from_obj(self, obj, attr): | |
"""Gets an attr from an obj. Uses getattr() for class-like objects and __getitem__ for dicts.""" | |
if self._obj_is_dict(obj) and attr in obj: | |
retval = obj[attr] | |
else: | |
retval = getattr(obj, attr) | |
return retval | |
def _set_attr_on_obj(self, obj, attr, val): | |
"""Sets an attr on an obj. Uses setattr() for class-like objects and __getitem__ / assignment for dicts.""" | |
if self._obj_is_dict(obj): | |
obj[attr] = val | |
else: | |
setattr(obj, attr, val) | |
@value_assigner | |
def run(self, fn: Callable[[Any], Any]) -> Any: | |
"""Calls a user-supplied function, supplying the value as the only | |
argument. | |
Parameters: | |
fn: function | |
Function to be called | |
Returns: return value of the called function | |
""" | |
#logging.debug(f"run({fn}) {self.val}") | |
retval = fn(self.val) | |
#logging.debug(f"run({fn}) {self.val} -> {retval}") | |
return retval | |
def if_(self, condition) -> ValIf: | |
"""Returns a ValIf() object with the supplied condition. | |
""" | |
#logging.debug(f"if_({condition}) {self.val}") | |
valif = ValIf(self.copy(), condition) | |
#logging.debug(f"if_({condition}) {self.val} -> {valif}") | |
return valif | |
def __repr__(self): | |
return f'<Val "{self.val}">' | |
def __str__(self): | |
return str(self.val) | |
class ValIf(object): | |
"""Supports decision-making on Val objects. | |
* If the supplied condition is met, a new Val() object with the | |
original value is returned. | |
* If the condition was not met, a new Val() object containing a | |
NegativeMatch() object is passed. | |
""" | |
def __init__(self, val_o: Val, condition: Callable[[Any], Any]): | |
"""Initializer. | |
Parameters | |
val: Val() object | |
condition: lambda | |
The condition to test the Val object's value. | |
""" | |
if not isinstance(val_o, Val): | |
raise ValueError("Got passed {type(val_o)} - ValIf does not accept anything but Val objects") | |
self.val_o = val_o | |
self.condition = condition | |
@staticmethod | |
def check_for_neg_match(f: Callable[[Any], Any]) -> Callable[[Any], Any]: | |
"""If the stored Val is a tuple originally from the parser | |
function, return a new Val() with the desired match groups. | |
Parameters: | |
groups: tuple | int | |
Desired match groups. | |
Returns: Val() | |
""" | |
@wraps(f) | |
def wrapper(self, *args, **kwargs): | |
if self.val_o.is_match: | |
return f(self, *args, **kwargs) | |
else: | |
return self.val_o | |
return wrapper | |
@property | |
def val(self) -> Val: | |
return self.val_o.val | |
@property | |
def is_match(self) -> bool: | |
return self.val_o.is_match | |
@check_for_neg_match | |
def decision(self, transform: Callable[[Any], Any] = lambda x: x) -> Val: | |
"""Supports the true() and false() methods. The bool result | |
of the stored condition is run through a transform lambda. | |
That result is applied to a new Val() object and returned. | |
Parameters | |
transform: lambda | |
A transform applied to the user-supplied condition. | |
Intended to be used by false(), where | |
transform=(lambda x: not x). | |
""" | |
new_val = self.val_o.val if transform(self.condition(self.val_o.val)) else NegativeMatch | |
return Val(new_val) | |
def true(self) -> Val: | |
"""Returns a Val() object with its matched_if flag set to True | |
if the user-supplied condition is True. | |
""" | |
return self.decision() | |
def false(self) -> Val: | |
"""Returns a Val() object with its matched_if flag set to True | |
if the user-supplied condition is False. | |
""" | |
return self.decision(lambda x: not x) | |
def __repr__(self) -> str: | |
return f"<ValIf {self.val}>" | |
def match(pattern: str, ccp_line: Union[IOSCfgLine, str]) -> Union[Match[AnyStr], None]: | |
"""Calls re.match on either an IOSCfgLine or a string. | |
Parameters | |
pattern: str | |
Regex pattern to match | |
ccp_line: str | IOSCfgLine | |
Text to match on | |
returns: result of re.match | |
""" | |
#logging.debug(f"match({ccp_line}, {pattern})") | |
text = None | |
if isinstance(ccp_line, IOSCfgLine): | |
text = ccp_line.text | |
elif isinstance(ccp_line, str): | |
text = ccp_line | |
else: | |
ValueError(f"Expecting IOSCfgLine or str, got {type(ccp_line)}") | |
retval = re.match(pattern, text) | |
#logging.debug(f"match({ccp_line}, {pattern}) -> {retval}") | |
return retval | |
def val_from_match(pattern: str, ccp_line: IOSCfgLine) -> Val: | |
"""Generates a Val() object based on the result of a regex match on | |
a line of text. | |
""" | |
#logging.debug(f"val_from_match({ccp_line}, {pattern})") | |
def clean(val): | |
"""Returns a strip()'ed result of group matching. | |
""" | |
if isinstance(val, str): | |
val = val.strip() | |
if val == '': | |
return None | |
return val | |
re_match = match(pattern, ccp_line) | |
retval = Val(NegativeMatch) | |
if re_match: | |
groups = (ccp_line,) + tuple([clean(i) for i in re_match.groups()]) | |
retval = Val(groups) | |
#logging.debug(f"val_from_match({ccp_line}, {pattern}) -> {retval.val}") | |
return retval |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment