Skip to content

Instantly share code, notes, and snippets.

@mwc360
Created January 22, 2025 18:15
Show Gist options
  • Save mwc360/e2ca91667c8fb95f75435f32aa3c27bb to your computer and use it in GitHub Desktop.
Save mwc360/e2ca91667c8fb95f75435f32aa3c27bb to your computer and use it in GitHub Desktop.
def enable_vorder_on_direct_lake_tables(scoped_workspaces : List[ str | uuid ] = None):
'''
This function is used to programmatically enable the V-Order Delta table property (`delta.parquet.vorder.enabled`) on tables used in Direct Lake Semantic Models.
The script first traverses Direct Lake Semantic Models to the root SQL Endpoint, and then Lakehouse so that V-Order can be unset
as a session level config and while having V-Order enabled for selective tables. This script will ensure that tables which have
downstream Power BI consumption which can benefit from V-Order will have the setting applied. We only want to enable for selective tables since it is
generally detrimental to Spark read and write performance.
In Fabric, the function must be executed on a Python Notebook since the Delta-rs library is used.
Params:
scoped_workspaces: will scope to the current workspace if a list of workspaces is not provided, either a list of names or GUIDs can be provided if you want to scope the operation to multiple workspaces.
'''
import sempy.fabric as fabric
import pandas as pd
import re
import uuid
from deltalake import DeltaTable
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt
# get lakehouses in scope for mapping the SQL Endpoint
fabric_rest = fabric.FabricRestClient()
workspaces = []
if len(scoped_workspaces) == 0:
workspaces = [{
"id": notebookutils.runtime.context['currentWorkspaceId'],
"displayName": notebookutils.runtime.context['currentWorkspaceName']
}]
else:
for scoped_workspace in scoped_workspaces:
workspace_list = fabric_rest.get(path_or_url=f"/v1/workspaces/").json()['value']
try:
uuid.UUID(scoped_workspace)
workspace_id = scoped_workspace
workspace_name = [workspace['displayName'] for workspace in workspace_list if workspace['id'] == scoped_workspace][0]
except:
workspace_name = scoped_workspace
workspace_id = [workspace['id'] for workspace in workspace_list if workspace['displayName'] == scoped_workspace][0]
workspaces.append({
'id': workspace_id,
'displayName': workspace_name
})
lakehouses = []
for workspace in workspaces:
lakehouses_in_workspace = fabric_rest.get(path_or_url=f"/v1/workspaces/{workspace['id']}/lakehouses").json()
try:
lakehouses.extend(lakehouses_in_workspace['value'])
except:
continue
onelake_domain = lakehouses[0]['properties']['oneLakeTablesPath'].split('//')[1].split('/')[0]
# get datasets in scope
datasets = []
for workspace in workspaces:
dataset_list = fabric_rest.get(path_or_url=f"/v1/workspaces/{workspace['id']}/semanticModels/").json()['value']
dataset_list = [dict(item, **{'workspaceName':workspace['displayName']}) for item in dataset_list]
datasets.extend(dataset_list)
# map direct lake tables to lakehouse
data = []
for dataset in datasets:
dataset_id = dataset['id']
dataset_name = dataset['displayName']
workspace_id = dataset['workspaceId']
workspace_name = dataset['workspaceName']
partitions_df = fabric.list_partitions(workspace=workspace_id, dataset=dataset_id)
partitions_df = partitions_df[partitions_df['Mode'] == 'DirectLake']
if len(partitions_df) > 0:
expressions_df = fabric.list_expressions(dataset = dataset_id, workspace = workspace_id)
expr = expressions_df[expressions_df['Name'] == 'DatabaseQuery']['Expression'].iloc[0]
sql_endpoint_id = re.findall(r'"([^"]*)"', expr)[1]
try:
tables_df = fabric.list_tables(workspace=workspace_id, dataset=dataset_id, additional_xmla_properties=['SourceLineageTag'])
if not tables_df.empty and not partitions_df.empty:
tables_df['lakehouse_name'] = [lh['displayName'] for lh in lakehouses if lh['properties']['sqlEndpointProperties']['id'] == sql_endpoint_id][0]
tables_df['lakehouse_id'] = [lh['id'] for lh in lakehouses if lh['properties']['sqlEndpointProperties']['id'] == sql_endpoint_id][0]
tables_df['schema_enabled'] = any(
lh['properties'].get('defaultSchema') is not None
for lh in lakehouses
if lh['properties']['sqlEndpointProperties']['id'] == sql_endpoint_id
)
tables_df['schema_name'] = tables_df['Source Lineage Tag'].str.replace('[', '', regex=False).str.replace(']', '', regex=False).str.split('.').str[0]
tables_df['table_name'] = tables_df['Source Lineage Tag'].str.replace('[', '', regex=False).str.replace(']', '', regex=False).str.split('.').str[1]
table_details_df = pd.merge(
tables_df,
partitions_df,
how='inner',
left_on='Name',
right_on='Table Name'
)[['lakehouse_id','lakehouse_name', 'schema_name', 'table_name', 'schema_enabled']]
if not table_details_df.empty:
table_details_df['workspace_id'] = workspace_id
table_details_df['workspace_name'] = workspace_name
table_details_df['directlake_models'] = [
[{"dataset_id": dataset_id, "dataset_name": dataset_name}]
] * len(table_details_df)
data.append(table_details_df)
except Exception as e:
continue
final_df = pd.concat(data, ignore_index=True).drop_duplicates(subset=['lakehouse_id', 'lakehouse_name', 'schema_name', 'table_name'])
# attempt to connect to delta table since auth fails the first time
@retry(stop=stop_after_attempt(2))
def establish_connection():
table = final_df.iloc[0]
if table['schema_enabled']:
table_path = f"abfss://{table['workspace_name']}@{onelake_domain}/{table['lakehouse_name']}.Lakehouse/Tables/{table['schema_name']}/{table['table_name']}/"
else:
table_path = f"abfss://{table['workspace_name']}@{onelake_domain}/{table['lakehouse_name']}.Lakehouse/Tables/{table['table_name']}/"
DeltaTable(table_path)
establish_connection()
# apply vorder to scoped tables
def process_table(table):
try:
full_table_name = f"`{table.lakehouse_name}`.`{table.schema_name}`.`{table.table_name}`"
print(f"Processing {full_table_name}")
if table.schema_enabled:
table_path = f"abfss://{table.workspace_name}@{onelake_domain}/{table.lakehouse_name}.Lakehouse/Tables/{table.schema_name}/{table.table_name}/"
else:
table_path = f"abfss://{table.workspace_name}@{onelake_domain}/{table.lakehouse_name}.Lakehouse/Tables/{table.table_name}/"
delta_table = DeltaTable(table_path)
properties = delta_table.metadata().configuration
vorder_enabled = properties.get('delta.parquet.vorder.enabled', 'false').lower() == 'true'
if vorder_enabled:
return f"V-Order already enabled for {full_table_name}"
else:
delta_table.alter.set_table_properties({"delta.parquet.vorder.enabled": "true"}, raise_if_not_exists=False)
return f"Set 'delta.parquet.vorder.enabled' to 'true' for {full_table_name}"
except Exception as e:
return f"Error processing {full_table_name}: {e}"
with ThreadPoolExecutor() as executor:
future_to_table = {executor.submit(process_table, table): table for _, table in final_df.iterrows()}
for future in as_completed(future_to_table):
result = future.result()
print(result)
# execute function, this can take up to a minute to run for a couple workspaces that have a couple small Direct Lake Semantic Models.
enable_vorder_on_direct_lake_tables(scoped_workspaces=['workspace_1', 'workspace_2'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment