-
-
Save mdrakiburrahman/026b4875a65879ee3076018374c1d998 to your computer and use it in GitHub Desktop.
# Reusable Functions | |
def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str): | |
""" | |
Authenticates Service Principal to the provided Resource URL, and returns the OAuth Access Token | |
""" | |
url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token" | |
payload= f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}' | |
headers = { | |
'Content-Type': 'application/x-www-form-urlencoded' | |
} | |
response = requests.request("POST", url, headers=headers, data=payload) | |
access_token = json.loads(response.text)['access_token'] | |
return access_token | |
def purview_auth(tenant_id: str, client_id: str, client_secret: str, data_catalog_name: str): | |
""" | |
Authenticates to Atlas Endpoint and returns a client object | |
""" | |
oauth = ServicePrincipalAuthentication( | |
tenant_id = tenant_id, | |
client_id = client_id, | |
client_secret = client_secret | |
) | |
client = PurviewClient( | |
account_name = data_catalog_name, | |
authentication = oauth | |
) | |
return client | |
def get_all_adls_assets(path: str, data_catalog_name: str, azuread_access_token: str, max_depth=1): | |
""" | |
Retrieves all scanned assets for the specified ADLS Storage Account Container. | |
Note: this function intentionally recursively traverses until only assets remain (i.e. no folders are returned, only files). | |
""" | |
# List all files in path | |
url = f"https://{data_catalog_name}.catalog.purview.azure.com/api/browse" | |
headers = { | |
'Authorization': f'Bearer {azuread_access_token}', | |
'Content-Type': 'application/json' | |
} | |
payload="""{"limit": 100, | |
"offset": null, | |
"path": "%s" | |
}""" % (path) | |
response = requests.request("POST", url, headers=headers, data=payload) | |
li = json.loads(response.text) | |
# Return all files | |
for x in jmespath.search("value", li): | |
if jmespath.search("isLeaf", x): | |
yield x | |
# If the max_depth has not been reached, start | |
# listing files and folders in subdirectories | |
if max_depth > 1: | |
for x in jmespath.search("value", li): | |
if jmespath.search("isLeaf", x): | |
continue | |
for y in get_all_adls_assets(jmespath.search("path", x), data_catalog_name, azuread_access_token, max_depth - 1): | |
yield y | |
# If max_depth has been reached, | |
# return the folders | |
else: | |
for x in jmespath.search("value", li): | |
if jmespath.search("!isLeaf", x): | |
yield x | |
def get_adls_asset_schema(assets_all: list, asset: str, purview_client: str): | |
""" | |
Returns the asset schema and classifications from Purview | |
""" | |
# Filter response for our asset of interest | |
assets_list = list(filter(lambda i: i['name'] == asset, assets_all)) | |
# Find the guid for the asset to retrieve the tabular_schema or attachedSchema (based on the asset type) | |
match_id = "" | |
for entry in assets_list: | |
# Retrieve the asset definition from the Atlas Client | |
response = purview_client.get_entity(entry['id']) | |
# API response is different based on the asset | |
if asset.split('.', 1)[-1] == "json": | |
filtered_response = jmespath.search("entities[?source=='DataScan'].[relationshipAttributes.attachedSchema[0].guid]", response) | |
else: | |
filtered_response = jmespath.search("entities[?source=='DataScan'].[relationshipAttributes.tabular_schema.guid]", response) | |
# Update match_id if source is DataScan | |
if filtered_response: | |
match_id = filtered_response[0][0] | |
# Retrieve the schema based on the guid match | |
response = purview_client.get_entity(match_id) | |
asset_schema = jmespath.search("[referredEntities.*.[attributes.name, classifications[0].[typeName][0]]]", response)[0] | |
return asset_schema | |
def deep_ls(path: str, max_depth=1): | |
""" | |
List all files and folders in specified path and | |
subfolders within maximum recursion depth. | |
""" | |
# List all files in path and apply sorting rules | |
li = mssparkutils.fs.ls(path) | |
# Return all files | |
for x in li: | |
if x.size != 0: | |
yield x | |
# If the max_depth has not been reached, start | |
# listing files and folders in subdirectories | |
if max_depth > 1: | |
for x in li: | |
if x.size != 0: | |
continue | |
for y in deep_ls(x.path, max_depth - 1): | |
yield y | |
# If max_depth has been reached, | |
# return the folders | |
else: | |
for x in li: | |
if x.size == 0: | |
yield x | |
def convertfiles2df(files): | |
""" | |
Converts FileInfo object into Pandas DataFrame to enable display | |
""" | |
# Disable Arrow-based transfers since the Pandas DataFrame is tiny | |
spark.conf.set("spark.sql.execution.arrow.enabled", "false") | |
schema = ['path','name','size'] | |
df = pd.DataFrame([[getattr(i,j) for j in schema] for i in files], columns = schema).sort_values('path') | |
return(df) | |
# Example Implementation | |
# ---------------------- | |
# Library Imports | |
import os | |
import requests | |
import json | |
import jmespath | |
import pandas as pd | |
from notebookutils import mssparkutils | |
from pprint import pprint | |
from pyapacheatlas.auth import ServicePrincipalAuthentication | |
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess, TypeCategory | |
from pyapacheatlas.core.typedef import * | |
from pyspark.sql import * | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
# Authentication | |
# Service Principal with "Purview Data Source Administrator" permissions on Purview | |
tenant_id = "your-tenant-id" | |
client_id = "service-principal-client-id" | |
client_secret = "service-principal-client-secret" | |
resource_url = "https://purview.azure.net" | |
data_catalog_name = "your-purview-service-name" | |
# Retrieve authentication objects | |
azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url) | |
purview_client = purview_auth(tenant_id, client_id, client_secret, data_catalog_name) | |
# Asset details | |
# Asset parameters | |
storage_account = "your-storage-account" | |
container = "your-container-name" | |
# The root level path we want to begin populating assets from | |
top_path = f"/azure_storage_account#{storage_account}.core.windows.net/azure_datalake_gen2_service#{storage_account}.dfs.core.windows.net/azure_datalake_gen2_filesystem#{container}" | |
# Retrieve full list of assets | |
assets_all = list(get_all_adls_assets(top_path, data_catalog_name, azuread_access_token, max_depth=20)) | |
# Azure storage access info | |
linked_service_name = 'adls-linked-service-name-in-synapse' | |
# Grab SAS token | |
adls_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name) | |
# Configure Spark to access from DFS endpoint | |
root = 'abfss://%s@%s.dfs.core.windows.net/' % (container, storage_account) | |
spark.conf.set('fs.azure.sas.%s.%s.dfs.core.windows.net' % (container, storage_account), adls_sas_token) | |
print('Remote adls root path: ' + root) | |
# Get ADLS files recursively | |
files = list(deep_ls(root, max_depth=20)) | |
files_df = convertfiles2df(files) # Note this is a Pandas DataFrame | |
# Generate asset-aligned names | |
files_df['asset'] = files_df['name'].str.replace(r'\d+', '{N}') | |
# Append schema row-wise from Purview | |
files_df['schema'] = files_df.apply(lambda row: get_adls_asset_schema(assets_all, row['asset'], purview_client), axis=1) | |
# Display Asset DataFrame | |
display(files_df) |
Updated gist - courtesy of @wjohnson
How to install notebookutils ? Couldn't find it using Pip :(
How to install notebookutils ? Couldn't find it using Pip :(
This script was written to be run in Azure Synapse Studio, that library is part of the Synapse Python runtime. To not run inside Synapse, you should be able to refactor those bits since most of the logic uses Pandas.
How to install notebookutils ? Couldn't find it using Pip :(
This script was written to be run in Azure Synapse Studio, that library is part of the Synapse Python runtime. To not run inside Synapse, you should be able to refactor those bits since most of the logic uses Pandas.
Got it, thank you.
How to install notebookutils ? Couldn't find it using Pip :(
This script was written to be run in Azure Synapse Studio, that library is part of the Synapse Python runtime. To not run inside Synapse, you should be able to refactor those bits since most of the logic uses Pandas.
Got it, thank you.
Can you share this, Mahmudul Hasan Bhuiyan?
Made
get_all_adls_assets
recursive when calling from Purview