Skip to content

Instantly share code, notes, and snippets.

@JackPott
Created June 11, 2022 16:29
Show Gist options
  • Save JackPott/e343267cdea7f4ba39fb8a21fec34372 to your computer and use it in GitHub Desktop.
Save JackPott/e343267cdea7f4ba39fb8a21fec34372 to your computer and use it in GitHub Desktop.
Import Confluent topic names to Datahub (without schema)
import logging
import os
import re
from dataclasses import dataclass
from typing import List, Optional
import datahub.emitter.mce_builder as builder
import stackprinter
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, ConfigResource, TopicMetadata
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (ChangeTypeClass,
DatasetPropertiesClass,
DomainsClass)
stackprinter.set_excepthook(style='lightbg')
logging.basicConfig(level=logging.INFO)
logging.info(__file__)
DATAHUB_GMS_HOST = os.getenv("DATAHUB_GMS_HOST", "http://localhost:8080")
logging.info(f"DATAHUB_GMS_HOST={DATAHUB_GMS_HOST}")
UAT_CONFLUENT_BOOTSTRAP = os.getenv("UAT_CONFLUENT_BOOTSTRAP")
logging.info(f"UAT_CONFLUENT_BOOTSTRAP={UAT_CONFLUENT_BOOTSTRAP}")
UAT_CONFLUENT_API_KEY_ID = os.getenv("UAT_CONFLUENT_API_KEY_ID")
logging.info(f"UAT_CONFLUENT_API_KEY_ID={UAT_CONFLUENT_API_KEY_ID[:4]}*******")
UAT_CONFLUENT_API_KEY_SECRET = os.getenv("UAT_CONFLUENT_API_KEY_SECRET")
logging.info(
f"UAT_CONFLUENT_API_KEY_SECRET={UAT_CONFLUENT_API_KEY_SECRET[:4]}*******")
@dataclass
class NoSchemaDataset:
env: str
platform: str
topic_metadata: TopicMetadata
description: Optional[str] = None
platform_instance: Optional[str] = None
domain: Optional[str] = None
class NoSchemaDatasetEmitter:
def __init__(self, emitter: DatahubRestEmitter):
self.emitter = emitter
def emit(self, dataset: NoSchemaDataset):
logging.info(
f"Starting metadata ingestion with provided dataSet: {dataset.topic_metadata}")
if dataset.platform_instance:
dataset_urn = builder.make_dataset_urn_with_platform_instance(
platform=dataset.platform,
platform_instance=dataset.platform_instance,
env=dataset.env,
name=dataset.topic_metadata.topic)
else:
dataset_urn = builder.make_dataset_urn(
platform=dataset.platform,
env=dataset.env,
name=dataset.name)
topic_config = describe_topic(a, dataset.topic_metadata.topic)
topic_config['partitions'] = str(
len(dataset.topic_metadata.partitions))
props = DatasetPropertiesClass(
customProperties=topic_config
)
dataset_properties_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="datasetProperties",
aspect=props,
)
logging.info(
f"Emitting 'datasetProperties' MCP for dataSet {dataset_urn}")
logging.debug(dataset_properties_mcp)
self.emitter.emit_mcp(dataset_properties_mcp)
# Domain aspect
if dataset.domain:
domains_aspect = DomainsClass(
[builder.make_domain_urn(dataset.domain)])
dataset_domain_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="domains",
aspect=domains_aspect,
)
logging.info(f"Emitting 'domains' MCP for dataset {dataset_urn}")
logging.debug(dataset_domain_mcp)
self.emitter.emit_mcp(dataset_domain_mcp)
def get_cluster_id(a: AdminClient) -> str:
md = a.list_topics(timeout=10)
return md.cluster_id
def list_topics(a: AdminClient, args=['topics']) -> 'list[TopicMetadata]':
md = a.list_topics(timeout=10)
logging.info(" {} topics:".format(len(md.topics)))
for t in iter(md.topics.values()):
logging.debug("{}".format(t))
return [t for t in md.topics.values()]
def describe_topic(a: AdminClient, topic: str) -> dict:
resources = [ConfigResource("topic", topic)]
# Async method, returns a dict of futures keyed by the resource name
fs = a.describe_configs(resources)
# We only ever send one topic, so always take the first result
f = fs[resources[0]]
try:
configs = f.result()
return {v.name: v.value for k, v in configs.items()}
except KafkaException as e:
print("Failed to describe {}: {}".format(f, e))
except Exception:
raise
def filter_deny(ip: 'list[TopicMetadata]', terms: 'list[str]') -> 'list[TopicMetadata]':
"""
Returns a filtered version of a list, removing thinks which match any of the
supplied regex string patterns
"""
patterns = [re.compile(t) for t in terms]
op = ip
for p in patterns:
op = [tmd for tmd in op if not p.match(tmd.topic)]
logging.info(
f"Removing matches for pattern '{p}'... filter_deny now has {len(op)} items")
return op
a = AdminClient({
'bootstrap.servers': UAT_CONFLUENT_BOOTSTRAP,
'ssl.endpoint.identification.algorithm': 'https',
'sasl.mechanism': 'PLAIN',
'request.timeout.ms': '20000',
'sasl.username': UAT_CONFLUENT_API_KEY_ID,
'sasl.password': UAT_CONFLUENT_API_KEY_SECRET,
'security.protocol': 'SASL_SSL'
})
emitter = DatahubRestEmitter(DATAHUB_GMS_HOST)
cluster_id = get_cluster_id(a)
all_topics = list_topics(a)
after_deny = filter_deny(
all_topics, ['_.*',
'pksql.*',
'cp-connect.*'])
# Pattern to extract domain when topic is in 'atech.domain.topicname' format
domain_pattern = re.compile("^atech\.(.*?)\..*")
for t in after_deny:
found_domain = domain_pattern.match(t.topic)
if found_domain:
domain = found_domain.group(1)
logging.info(
f"Found a domain name from the topic name: '{domain}'")
else:
domain = None
nsd = NoSchemaDataset(
platform="kafka",
platform_instance=cluster_id,
env="UAT",
topic_metadata=t,
domain=domain,
)
NoSchemaDatasetEmitter(emitter=emitter).emit(nsd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment