Created
December 12, 2023 10:28
-
-
Save MatMoore/b4a99f8b93c278480fc712d808fc9b41 to your computer and use it in GitHub Desktop.
EMFS glossary spreadsheet to Datahub script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Can't capture exactly: | |
- condionally required fields | |
- validation rules | |
- format | |
""" | |
import csv | |
from os import environ | |
import datahub.emitter.mce_builder as builder | |
from datahub.api.entities.dataproduct.dataproduct import DataProduct | |
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn | |
from datahub.emitter.mcp import MetadataChangeProposalWrapper | |
from datahub.emitter.rest_emitter import DatahubRestEmitter | |
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph | |
from datahub.metadata.schema_classes import ( | |
AuditStampClass, | |
BooleanTypeClass, | |
DatasetPropertiesClass, | |
DateTypeClass, | |
EnumTypeClass, | |
NumberTypeClass, | |
OtherSchemaClass, | |
SchemaFieldClass, | |
SchemaFieldDataTypeClass, | |
SchemaMetadataClass, | |
StringTypeClass, | |
TimeTypeClass, | |
) | |
entities = {} | |
with open("input_v3.0.csv", newline="") as input_csvfile: | |
reader = csv.DictReader(input_csvfile) | |
for row in reader: | |
entity = row["Entity"].strip() | |
if not entity: | |
continue | |
if entity.startswith("TBC with supplier"): | |
continue | |
attribute = row["Attribute Name"].strip() | |
desc = row["Definition"].strip() | |
datatype = row["Datatype"].strip() | |
reference_data = row["Reference data"].strip() | |
mandatory = row["Mandatory"].strip() | |
mandatory_conditions = row["Mandatory Conditions"].strip() | |
validation_rules = row["Validation rules"].strip() | |
comments = row["Comments for suppliers"].strip() | |
tags = [] | |
if mandatory_conditions == "Automatically created by system": | |
tags.append("automatically-created-by-system") | |
if mandatory_conditions == "Needed when an Order is created": | |
tags.append("needed-for-order") | |
if mandatory == "Conditionally Mandatory": | |
tags.append("conditionally-mandatory") | |
if attribute: | |
entity = { | |
"EAR Service Request": "EAR (External Agency Request) Service Request", | |
"FMO Visit": "FMO (Field Monitoring Officer) Visit", | |
}.get(entity, entity) | |
mapped_datatype = { | |
"Text": StringTypeClass, | |
"Categorical": EnumTypeClass, | |
"Date": DateTypeClass, | |
"Time": TimeTypeClass, | |
}.get(datatype, StringTypeClass) | |
if mapped_datatype is not None: | |
mapped_datatype = mapped_datatype() | |
entities[entity]["columns"].append( | |
SchemaFieldClass( | |
fieldPath=attribute, | |
type=SchemaFieldDataTypeClass(type=mapped_datatype), | |
nativeDataType=datatype, | |
nullable=mandatory != "Mandatory", | |
description=desc, | |
label=attribute, | |
lastModified=AuditStampClass( | |
time=1640692800000, actor="urn:li:corpuser:ingestion" | |
), | |
) | |
) | |
else: | |
entities[entity] = {"desc": desc, "columns": []} | |
# Create an emitter to DataHub over REST | |
emitter = DatahubRestEmitter( | |
gms_server="https://datahub.apps-tools.development.data-platform.service.justice.gov.uk/api/gms", | |
token=environ["GMS_AUTH_TOKEN"], | |
extra_headers={}, | |
) | |
# Test the connection | |
emitter.test_connection() | |
def callback_func(e: Exception, s: str): | |
print(e) | |
print(s) | |
def add_entity(entity_name, entity): | |
# Construct a dataset properties object | |
dataset_properties = DatasetPropertiesClass( | |
description=entity["desc"], | |
name=entity_name, | |
customProperties={}, | |
) | |
# Construct a MetadataChangeProposalWrapper object. | |
urn_name = entity_name.lower().translate(str.maketrans(" ", "-", "()")) | |
entity_urn = builder.make_dataset_urn("future", f"future.{urn_name}") | |
metadata_event = MetadataChangeProposalWrapper( | |
entityUrn=entity_urn, | |
aspect=dataset_properties, | |
) | |
emitter.emit(metadata_event, callback=callback_func) | |
dataset_schema_properties = SchemaMetadataClass( | |
schemaName="customer", # not used | |
platform=make_data_platform_urn( | |
"future" | |
), # important <- platform must be an urn | |
version=1, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 | |
hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string | |
# platformSchema: Union["EspressoSchemaClass", "OracleDDLClass", "MySqlDDLClass", "PrestoDDLClass", "KafkaSchemaClass", "BinaryJsonSchemaClass", "OrcSchemaClass", "SchemalessClass", "KeyValueSchemaClass", "OtherSchemaClass"], | |
platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__"), | |
lastModified=AuditStampClass( | |
time=1640692800000, actor="urn:li:corpuser:ingestion" | |
), | |
fields=entity["columns"], | |
) | |
metadata_event = MetadataChangeProposalWrapper( | |
entityUrn=entity_urn, | |
aspect=dataset_schema_properties, | |
) | |
emitter.emit(metadata_event, callback=callback_func) | |
for entity_name, entity in entities.items(): | |
if not entity["columns"] and not entity["desc"]: | |
continue | |
add_entity(entity_name, entity) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment