Skip to content

Instantly share code, notes, and snippets.

@hsheth2
Created April 26, 2023 23:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hsheth2/d5451f39624523b289a9246e4edb75c7 to your computer and use it in GitHub Desktop.
Save hsheth2/d5451f39624523b289a9246e4edb75c7 to your computer and use it in GitHub Desktop.
import logging
import os
import pathlib
from typing import Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union
from ruamel.yaml import YAML
from datahub.ingestion.graph.client import (
DatahubClientConfig,
DataHubGraph,
get_default_graph,
)
from datahub.ingestion.source.metadata.business_glossary import (
BusinessGlossaryConfig,
BusinessGlossaryFileSource,
DefaultConfig,
GlossaryNodeConfig,
GlossaryNodeInterface,
GlossaryTermConfig,
KnowledgeCard,
Owners,
materialize_all_node_urns,
populate_path_vs_id,
)
from datahub.metadata.schema_classes import (
AspectBag,
DomainsClass,
GlossaryNodeInfoClass,
GlossaryRelatedTermsClass,
GlossaryTermInfoClass,
InstitutionalMemoryClass,
OwnershipClass,
)
from datahub.utilities.urns.urn import guess_entity_type
logger = logging.getLogger(__name__)
ParentUrn = str
TEST_MODE = os.getenv("TEST_MODE", "false").lower() == "true"
_DELETED_NODE_TOMBSTONE = object()
_GlossaryElement = TypeVar("_GlossaryElement", GlossaryNodeConfig, GlossaryTermConfig)
def get_graph() -> DataHubGraph:
if TEST_MODE:
return get_default_graph()
else:
DATAHUB_SERVER = os.environ["DATAHUB_GMS_HOST"]
DATAHUB_TOKEN: Optional[str] = os.getenv("DATAHUB_GMS_TOKEN")
return DataHubGraph(
DatahubClientConfig(server=DATAHUB_SERVER, token=DATAHUB_TOKEN)
)
def _get_id_from_urn(urn: str) -> str:
return urn.split(":")[-1]
def _datahub_ownership_to_owners(
ownership: Optional[OwnershipClass],
) -> Optional[Owners]:
if ownership is None:
return None
owner_urns = [owner.owner for owner in ownership.owners]
return Owners(
users=[
_get_id_from_urn(urn)
for urn in owner_urns
if guess_entity_type(urn) == "corpuser"
]
or None,
groups=[
_get_id_from_urn(urn)
for urn in owner_urns
if guess_entity_type(urn) == "corpGroup"
]
or None,
)
def _datahub_domain_to_str(domain: Optional[DomainsClass]) -> Optional[str]:
if domain is None or not domain.domains:
return None
if len(domain.domains) > 1:
logger.warning(f"Found multiple domains for {domain} - using first one")
return None
return domain.domains[0]
def _datahub_institutional_memory_to_knowledge_links(
institutionalMemory: Optional[InstitutionalMemoryClass],
) -> Optional[List[KnowledgeCard]]:
if institutionalMemory is None:
return None
return [
KnowledgeCard(
url=element.url,
label=element.description,
)
for element in institutionalMemory.elements
]
def _glossary_node_from_datahub(
urn: str, aspects: AspectBag
) -> Tuple[GlossaryNodeConfig, Optional[ParentUrn]]:
info_aspect: GlossaryNodeInfoClass = aspects["glossaryNodeInfo"]
owners = aspects.get("ownership")
institutionalMemory = aspects.get("institutionalMemory")
node = GlossaryNodeConfig(
id=urn,
name=info_aspect.name or _get_id_from_urn(urn),
description=info_aspect.definition,
owners=_datahub_ownership_to_owners(owners),
knowledge_links=_datahub_institutional_memory_to_knowledge_links(
institutionalMemory
),
# These are populated later.
terms=[],
nodes=[],
)
node._urn = urn
parent_urn = info_aspect.parentNode
return node, parent_urn
def _glossary_term_from_datahub(
urn: str, aspects: AspectBag
) -> Tuple[GlossaryTermConfig, Optional[ParentUrn]]:
info_aspect: GlossaryTermInfoClass = aspects["glossaryTermInfo"]
related_terms: GlossaryRelatedTermsClass = aspects.get(
"glossaryRelatedTerms", GlossaryRelatedTermsClass()
)
owners = aspects.get("ownership")
domain = aspects.get("domains")
institutionalMemory = aspects.get("institutionalMemory")
term = GlossaryTermConfig(
id=urn,
name=info_aspect.name or _get_id_from_urn(urn),
description=info_aspect.definition,
term_source=info_aspect.termSource,
source_ref=info_aspect.sourceRef,
source_url=info_aspect.sourceUrl,
owners=_datahub_ownership_to_owners(owners),
custom_properties=info_aspect.customProperties,
knowledge_links=_datahub_institutional_memory_to_knowledge_links(
institutionalMemory
),
domain=_datahub_domain_to_str(domain),
# Where possible, these are converted into biz glossary paths later.
inherits=related_terms.isRelatedTerms,
contains=related_terms.hasRelatedTerms,
values=related_terms.values,
related_terms=related_terms.relatedTerms,
)
term._urn = urn
parent_urn = info_aspect.parentNode
return term, parent_urn
def fetch_datahub_glossary():
graph = get_graph()
# Get all the urns in the glossary.
urns = list(graph.get_urns_by_filter(entity_types=["glossaryTerm", "glossaryNode"]))
# Hydrate them into entities.
entities = {urn: graph.get_entity_semityped(urn) for urn in urns}
# Map these into pydantic models defined in the biz glossary source.
# 1. Map each AspectBag -> pydantic model.
# 2. Construct the hierarchy of pydantic models using the lookup table.
# Parse glossary nodes.
raw_nodes = {
urn: _glossary_node_from_datahub(urn, entities[urn])
for urn in urns
if guess_entity_type(urn) == "glossaryNode"
}
# Construct the hierarchy of nodes.
top_level_nodes: List[GlossaryNodeConfig] = []
for (node, parent_urn) in raw_nodes.values():
if parent_urn is None:
top_level_nodes.append(node)
else:
parent_node, _ = raw_nodes[parent_urn]
parent_node.nodes = parent_node.nodes or []
parent_node.nodes.append(node)
# Parse glossary terms.
raw_terms = {
urn: _glossary_term_from_datahub(urn, entities[urn])
for urn in urns
if guess_entity_type(urn) == "glossaryTerm"
}
# Construct the hierarchy of terms.
top_level_terms: List[GlossaryTermConfig] = []
for (term, parent_urn) in raw_terms.values():
if parent_urn is None:
top_level_terms.append(term)
else:
parent_node, _ = raw_nodes[parent_urn]
parent_node.terms = parent_node.terms or []
parent_node.terms.append(term)
return top_level_nodes, top_level_terms
def prune_latest_glossary(
latest_glossary: BusinessGlossaryConfig, existing_glossary: BusinessGlossaryConfig
) -> None:
# TODO: Update this logic to allow for pruning of nodes within the hierarchy as well.
allowed_node_urns = set(node._urn for node in (existing_glossary.nodes or []))
allowed_term_urns = set(term._urn for term in (existing_glossary.terms or []))
latest_glossary.nodes = [
node for node in (latest_glossary.nodes or []) if node._urn in allowed_node_urns
]
latest_glossary.terms = [
term for term in (latest_glossary.terms or []) if term._urn in allowed_term_urns
]
def replace_urn_refs_with_paths(
glossary: BusinessGlossaryConfig, path_to_id_map: Dict[str, str]
) -> None:
urn_to_path_map = {urn: path for (path, urn) in path_to_id_map.items()}
def _simplify_urn_list(urns: Optional[List[str]]) -> Optional[List[str]]:
if urns is None:
return None
return [urn_to_path_map.get(urn, urn) for urn in urns]
def _process_child_terms(parent_node: GlossaryNodeInterface) -> None:
for term in parent_node.terms or []:
term.inherits = _simplify_urn_list(term.inherits)
term.contains = _simplify_urn_list(term.contains)
term.values = _simplify_urn_list(term.values)
term.related_terms = _simplify_urn_list(term.related_terms)
for node in parent_node.nodes or []:
_process_child_terms(node)
_process_child_terms(glossary)
def _align_glossary_elements(
latest: List[_GlossaryElement], existing: List[_GlossaryElement]
) -> Iterable[Tuple[Optional[_GlossaryElement], Optional[_GlossaryElement]]]:
latest_by_id = {element._urn: element for element in latest}
latest_ids = set(latest_by_id.keys())
# Emit all existing elements first, matching where with latest where possible.
for existing_element in existing:
if existing_element._urn in latest_ids:
latest_ids.remove(existing_element._urn)
yield latest_by_id[existing_element._urn], existing_element
else:
yield None, existing_element
# Emit all new elements.
for latest_id in latest_ids:
yield latest_by_id[latest_id], None
def glossary_to_dict_minimize_diffs(
latest_glossary: BusinessGlossaryConfig, existing_glossary: BusinessGlossaryConfig
) -> dict:
def _simple_elem_to_dict(
latest_elem: Union[BusinessGlossaryConfig, _GlossaryElement],
existing_elem: Union[BusinessGlossaryConfig, _GlossaryElement],
defaults: DefaultConfig,
exclude: Optional[Set[str]] = None,
) -> dict:
if isinstance(latest_elem, BusinessGlossaryConfig):
return latest_elem.dict(exclude_defaults=True, exclude_none=True)
assert not isinstance(existing_elem, BusinessGlossaryConfig)
# Exclude fields that are default values here AND are not set in the existing glossary.
#
# In other words, a field will be included if:
# 1. is set in the existing glossary or
# 2. the value in the latest glossary is not a default value (and this isn't the top-level config)
existing_set_keys = existing_elem.__fields_set__
exclude = (exclude or set()).copy()
if "id" not in existing_set_keys:
# Drop ID field when not set in the existing glossary.
#
# SUBTLE: We can drop the ID here because we know that existing_elem._urn
# matches latest_elem._urn. As such, we know the id field is redundant
# if it is not set in the existing glossary.
exclude.add("id")
fields = latest_elem.dict(
include=existing_set_keys,
exclude=exclude,
exclude_defaults=True,
exclude_none=True,
)
# TODO filter out fields that match defaults
return fields
def _to_dict(
latest_node: GlossaryNodeInterface,
existing_node: GlossaryNodeInterface,
defaults: DefaultConfig,
) -> dict:
# Process terms.
terms = []
for latest_term_elem, existing_term_elem in _align_glossary_elements(
latest_node.terms or [], existing_node.terms or []
):
if latest_term_elem is None:
terms.append(_DELETED_NODE_TOMBSTONE)
elif existing_term_elem is None:
terms.append(latest_term_elem.dict(exclude_defaults=True))
else:
terms.append(
_simple_elem_to_dict(latest_term_elem, existing_term_elem, defaults)
)
# Process nodes.
nodes = []
for latest_node_elem, existing_node_elem in _align_glossary_elements(
latest_node.nodes or [], existing_node.nodes or []
):
if latest_node_elem is None:
nodes.append(_DELETED_NODE_TOMBSTONE)
elif existing_node_elem is None:
nodes.append(latest_node_elem.dict(exclude_defaults=True))
else:
# TODO update defaults with current owner
nodes.append(
_to_dict(latest_node_elem, existing_node_elem, defaults=defaults)
)
# Retain other fields as-is.
other_fields = _simple_elem_to_dict(
latest_node, existing_node, defaults, exclude={"terms", "nodes"}
)
return {
**other_fields,
**({"terms": terms} if terms else {}),
**({"nodes": nodes} if nodes else {}),
}
return _to_dict(latest_glossary, existing_glossary, defaults=existing_glossary)
def update_yml_to_match(
infile: pathlib.Path,
outfile: pathlib.Path,
target: dict,
) -> None:
yaml = YAML()
# Prevent line wrapping + preserve indentation.
yaml.preserve_quotes = True # type: ignore[assignment]
yaml.width = 2**20 # type: ignore[assignment]
# TODO: use https://cs.github.com/burkasaurusrex/Plex-Auto-Collections/blob/22bd7da5b306cd5ba309b6a2e4237d7b10aea7db/app/trakt_helpers.py?q=load_yaml_guess_indent#L30
# to autodetect indentation
yaml.indent(mapping=2, sequence=4, offset=2)
doc = yaml.load(infile)
def _update_doc(doc, target, update_full):
if isinstance(target, dict):
if not isinstance(doc, dict):
update_full(target)
else:
for key, value in target.items():
if key not in doc:
doc[key] = value
else:
def _update_value(v):
doc[key] = v
_update_doc(doc[key], value, _update_value)
elif isinstance(target, list):
if not isinstance(doc, list):
update_full(doc, target)
else:
# We assume that the two lists are perfectly aligned, which the exception of deletions.
elems_to_delete = []
for i, value in enumerate(target):
if i >= len(doc):
doc.append(value)
elif doc[i] == _DELETED_NODE_TOMBSTONE:
elems_to_delete.append(i)
else:
def _update_value(v):
doc[i] = v
_update_doc(doc[i], value, _update_value)
for i in reversed(elems_to_delete):
del doc[i]
else:
update_full(target)
_update_doc(doc, target, None)
yaml.dump(doc, outfile)
def main():
file = pathlib.Path("./examples/bootstrap_data/business_glossary.yml")
enable_auto_ids = True
# Read the existing biz glossary file.
existing_glossary = BusinessGlossaryFileSource.load_glossary_config(file)
materialize_all_node_urns(existing_glossary, enable_auto_id=enable_auto_ids)
path_to_id_map = populate_path_vs_id(existing_glossary)
# Fetch the latest glossary from DataHub.
top_level_nodes, top_level_terms = fetch_datahub_glossary()
latest_glossary = existing_glossary.copy(
update=dict(
nodes=top_level_nodes,
terms=top_level_terms,
),
deep=True,
)
# Prune the latest glossary to only include file-managed nodes/terms
prune_latest_glossary(
latest_glossary=latest_glossary,
existing_glossary=existing_glossary,
)
# Recursively simplify urn references where possible.
replace_urn_refs_with_paths(latest_glossary, path_to_id_map=path_to_id_map)
# Minimize diffs between the two glossaries using the field defaults and default config.
obj = glossary_to_dict_minimize_diffs(
latest_glossary=latest_glossary,
existing_glossary=existing_glossary,
)
# Serialize back into yaml.
update_yml_to_match(file, pathlib.Path("out.yml"), obj)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment