Skip to content

Instantly share code, notes, and snippets.

@mdrakiburrahman
Last active March 26, 2024 17:51
Show Gist options
  • Save mdrakiburrahman/026b4875a65879ee3076018374c1d998 to your computer and use it in GitHub Desktop.
Save mdrakiburrahman/026b4875a65879ee3076018374c1d998 to your computer and use it in GitHub Desktop.
Extracting metadata from Azure Purview with Synapse Spark Pools
# Reusable Functions
def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str):
"""
Authenticates Service Principal to the provided Resource URL, and returns the OAuth Access Token
"""
url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
payload= f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
response = requests.request("POST", url, headers=headers, data=payload)
access_token = json.loads(response.text)['access_token']
return access_token
def purview_auth(tenant_id: str, client_id: str, client_secret: str, data_catalog_name: str):
"""
Authenticates to Atlas Endpoint and returns a client object
"""
oauth = ServicePrincipalAuthentication(
tenant_id = tenant_id,
client_id = client_id,
client_secret = client_secret
)
client = PurviewClient(
account_name = data_catalog_name,
authentication = oauth
)
return client
def get_all_adls_assets(path: str, data_catalog_name: str, azuread_access_token: str, max_depth=1):
"""
Retrieves all scanned assets for the specified ADLS Storage Account Container.
Note: this function intentionally recursively traverses until only assets remain (i.e. no folders are returned, only files).
"""
# List all files in path
url = f"https://{data_catalog_name}.catalog.purview.azure.com/api/browse"
headers = {
'Authorization': f'Bearer {azuread_access_token}',
'Content-Type': 'application/json'
}
payload="""{"limit": 100,
"offset": null,
"path": "%s"
}""" % (path)
response = requests.request("POST", url, headers=headers, data=payload)
li = json.loads(response.text)
# Return all files
for x in jmespath.search("value", li):
if jmespath.search("isLeaf", x):
yield x
# If the max_depth has not been reached, start
# listing files and folders in subdirectories
if max_depth > 1:
for x in jmespath.search("value", li):
if jmespath.search("isLeaf", x):
continue
for y in get_all_adls_assets(jmespath.search("path", x), data_catalog_name, azuread_access_token, max_depth - 1):
yield y
# If max_depth has been reached,
# return the folders
else:
for x in jmespath.search("value", li):
if jmespath.search("!isLeaf", x):
yield x
def get_adls_asset_schema(assets_all: list, asset: str, purview_client: str):
"""
Returns the asset schema and classifications from Purview
"""
# Filter response for our asset of interest
assets_list = list(filter(lambda i: i['name'] == asset, assets_all))
# Find the guid for the asset to retrieve the tabular_schema or attachedSchema (based on the asset type)
match_id = ""
for entry in assets_list:
# Retrieve the asset definition from the Atlas Client
response = purview_client.get_entity(entry['id'])
# API response is different based on the asset
if asset.split('.', 1)[-1] == "json":
filtered_response = jmespath.search("entities[?source=='DataScan'].[relationshipAttributes.attachedSchema[0].guid]", response)
else:
filtered_response = jmespath.search("entities[?source=='DataScan'].[relationshipAttributes.tabular_schema.guid]", response)
# Update match_id if source is DataScan
if filtered_response:
match_id = filtered_response[0][0]
# Retrieve the schema based on the guid match
response = purview_client.get_entity(match_id)
asset_schema = jmespath.search("[referredEntities.*.[attributes.name, classifications[0].[typeName][0]]]", response)[0]
return asset_schema
def deep_ls(path: str, max_depth=1):
"""
List all files and folders in specified path and
subfolders within maximum recursion depth.
"""
# List all files in path and apply sorting rules
li = mssparkutils.fs.ls(path)
# Return all files
for x in li:
if x.size != 0:
yield x
# If the max_depth has not been reached, start
# listing files and folders in subdirectories
if max_depth > 1:
for x in li:
if x.size != 0:
continue
for y in deep_ls(x.path, max_depth - 1):
yield y
# If max_depth has been reached,
# return the folders
else:
for x in li:
if x.size == 0:
yield x
def convertfiles2df(files):
"""
Converts FileInfo object into Pandas DataFrame to enable display
"""
# Disable Arrow-based transfers since the Pandas DataFrame is tiny
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
schema = ['path','name','size']
df = pd.DataFrame([[getattr(i,j) for j in schema] for i in files], columns = schema).sort_values('path')
return(df)
# Example Implementation
# ----------------------
# Library Imports
import os
import requests
import json
import jmespath
import pandas as pd
from notebookutils import mssparkutils
from pprint import pprint
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess, TypeCategory
from pyapacheatlas.core.typedef import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Authentication
# Service Principal with "Purview Data Source Administrator" permissions on Purview
tenant_id = "your-tenant-id"
client_id = "service-principal-client-id"
client_secret = "service-principal-client-secret"
resource_url = "https://purview.azure.net"
data_catalog_name = "your-purview-service-name"
# Retrieve authentication objects
azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url)
purview_client = purview_auth(tenant_id, client_id, client_secret, data_catalog_name)
# Asset details
# Asset parameters
storage_account = "your-storage-account"
container = "your-container-name"
# The root level path we want to begin populating assets from
top_path = f"/azure_storage_account#{storage_account}.core.windows.net/azure_datalake_gen2_service#{storage_account}.dfs.core.windows.net/azure_datalake_gen2_filesystem#{container}"
# Retrieve full list of assets
assets_all = list(get_all_adls_assets(top_path, data_catalog_name, azuread_access_token, max_depth=20))
# Azure storage access info
linked_service_name = 'adls-linked-service-name-in-synapse'
# Grab SAS token
adls_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
# Configure Spark to access from DFS endpoint
root = 'abfss://%s@%s.dfs.core.windows.net/' % (container, storage_account)
spark.conf.set('fs.azure.sas.%s.%s.dfs.core.windows.net' % (container, storage_account), adls_sas_token)
print('Remote adls root path: ' + root)
# Get ADLS files recursively
files = list(deep_ls(root, max_depth=20))
files_df = convertfiles2df(files) # Note this is a Pandas DataFrame
# Generate asset-aligned names
files_df['asset'] = files_df['name'].str.replace(r'\d+', '{N}')
# Append schema row-wise from Purview
files_df['schema'] = files_df.apply(lambda row: get_adls_asset_schema(assets_all, row['asset'], purview_client), axis=1)
# Display Asset DataFrame
display(files_df)
@mdrakiburrahman
Copy link
Author

Made get_all_adls_assets recursive when calling from Purview

@mdrakiburrahman
Copy link
Author

Updated gist - courtesy of @wjohnson

@BhuiyanMH
Copy link

How to install notebookutils ? Couldn't find it using Pip :(

@mdrakiburrahman
Copy link
Author

How to install notebookutils ? Couldn't find it using Pip :(

This script was written to be run in Azure Synapse Studio, that library is part of the Synapse Python runtime. To not run inside Synapse, you should be able to refactor those bits since most of the logic uses Pandas.

@BhuiyanMH
Copy link

How to install notebookutils ? Couldn't find it using Pip :(

This script was written to be run in Azure Synapse Studio, that library is part of the Synapse Python runtime. To not run inside Synapse, you should be able to refactor those bits since most of the logic uses Pandas.

Got it, thank you.

@Nickikku
Copy link

How to install notebookutils ? Couldn't find it using Pip :(

This script was written to be run in Azure Synapse Studio, that library is part of the Synapse Python runtime. To not run inside Synapse, you should be able to refactor those bits since most of the logic uses Pandas.

Got it, thank you.

Can you share this, Mahmudul Hasan Bhuiyan?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment