Skip to content

Instantly share code, notes, and snippets.

@MatMoore
Created December 12, 2023 10:28
Show Gist options
  • Save MatMoore/b4a99f8b93c278480fc712d808fc9b41 to your computer and use it in GitHub Desktop.
Save MatMoore/b4a99f8b93c278480fc712d808fc9b41 to your computer and use it in GitHub Desktop.
EMFS glossary spreadsheet to Datahub script
"""
Can't capture exactly:
- condionally required fields
- validation rules
- format
"""
import csv
from os import environ
import datahub.emitter.mce_builder as builder
from datahub.api.entities.dataproduct.dataproduct import DataProduct
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import (
AuditStampClass,
BooleanTypeClass,
DatasetPropertiesClass,
DateTypeClass,
EnumTypeClass,
NumberTypeClass,
OtherSchemaClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
SchemaMetadataClass,
StringTypeClass,
TimeTypeClass,
)
entities = {}
with open("input_v3.0.csv", newline="") as input_csvfile:
reader = csv.DictReader(input_csvfile)
for row in reader:
entity = row["Entity"].strip()
if not entity:
continue
if entity.startswith("TBC with supplier"):
continue
attribute = row["Attribute Name"].strip()
desc = row["Definition"].strip()
datatype = row["Datatype"].strip()
reference_data = row["Reference data"].strip()
mandatory = row["Mandatory"].strip()
mandatory_conditions = row["Mandatory Conditions"].strip()
validation_rules = row["Validation rules"].strip()
comments = row["Comments for suppliers"].strip()
tags = []
if mandatory_conditions == "Automatically created by system":
tags.append("automatically-created-by-system")
if mandatory_conditions == "Needed when an Order is created":
tags.append("needed-for-order")
if mandatory == "Conditionally Mandatory":
tags.append("conditionally-mandatory")
if attribute:
entity = {
"EAR Service Request": "EAR (External Agency Request) Service Request",
"FMO Visit": "FMO (Field Monitoring Officer) Visit",
}.get(entity, entity)
mapped_datatype = {
"Text": StringTypeClass,
"Categorical": EnumTypeClass,
"Date": DateTypeClass,
"Time": TimeTypeClass,
}.get(datatype, StringTypeClass)
if mapped_datatype is not None:
mapped_datatype = mapped_datatype()
entities[entity]["columns"].append(
SchemaFieldClass(
fieldPath=attribute,
type=SchemaFieldDataTypeClass(type=mapped_datatype),
nativeDataType=datatype,
nullable=mandatory != "Mandatory",
description=desc,
label=attribute,
lastModified=AuditStampClass(
time=1640692800000, actor="urn:li:corpuser:ingestion"
),
)
)
else:
entities[entity] = {"desc": desc, "columns": []}
# Create an emitter to DataHub over REST
emitter = DatahubRestEmitter(
gms_server="https://datahub.apps-tools.development.data-platform.service.justice.gov.uk/api/gms",
token=environ["GMS_AUTH_TOKEN"],
extra_headers={},
)
# Test the connection
emitter.test_connection()
def callback_func(e: Exception, s: str):
print(e)
print(s)
def add_entity(entity_name, entity):
# Construct a dataset properties object
dataset_properties = DatasetPropertiesClass(
description=entity["desc"],
name=entity_name,
customProperties={},
)
# Construct a MetadataChangeProposalWrapper object.
urn_name = entity_name.lower().translate(str.maketrans(" ", "-", "()"))
entity_urn = builder.make_dataset_urn("future", f"future.{urn_name}")
metadata_event = MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=dataset_properties,
)
emitter.emit(metadata_event, callback=callback_func)
dataset_schema_properties = SchemaMetadataClass(
schemaName="customer", # not used
platform=make_data_platform_urn(
"future"
), # important <- platform must be an urn
version=1, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
# platformSchema: Union["EspressoSchemaClass", "OracleDDLClass", "MySqlDDLClass", "PrestoDDLClass", "KafkaSchemaClass", "BinaryJsonSchemaClass", "OrcSchemaClass", "SchemalessClass", "KeyValueSchemaClass", "OtherSchemaClass"],
platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__"),
lastModified=AuditStampClass(
time=1640692800000, actor="urn:li:corpuser:ingestion"
),
fields=entity["columns"],
)
metadata_event = MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=dataset_schema_properties,
)
emitter.emit(metadata_event, callback=callback_func)
for entity_name, entity in entities.items():
if not entity["columns"] and not entity["desc"]:
continue
add_entity(entity_name, entity)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment