Created
January 22, 2025 18:15
-
-
Save mwc360/e2ca91667c8fb95f75435f32aa3c27bb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def enable_vorder_on_direct_lake_tables(scoped_workspaces : List[ str | uuid ] = None): | |
''' | |
This function is used to programmatically enable the V-Order Delta table property (`delta.parquet.vorder.enabled`) on tables used in Direct Lake Semantic Models. | |
The script first traverses Direct Lake Semantic Models to the root SQL Endpoint, and then Lakehouse so that V-Order can be unset | |
as a session level config and while having V-Order enabled for selective tables. This script will ensure that tables which have | |
downstream Power BI consumption which can benefit from V-Order will have the setting applied. We only want to enable for selective tables since it is | |
generally detrimental to Spark read and write performance. | |
In Fabric, the function must be executed on a Python Notebook since the Delta-rs library is used. | |
Params: | |
scoped_workspaces: will scope to the current workspace if a list of workspaces is not provided, either a list of names or GUIDs can be provided if you want to scope the operation to multiple workspaces. | |
''' | |
import sempy.fabric as fabric | |
import pandas as pd | |
import re | |
import uuid | |
from deltalake import DeltaTable | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from tenacity import retry, stop_after_attempt | |
# get lakehouses in scope for mapping the SQL Endpoint | |
fabric_rest = fabric.FabricRestClient() | |
workspaces = [] | |
if len(scoped_workspaces) == 0: | |
workspaces = [{ | |
"id": notebookutils.runtime.context['currentWorkspaceId'], | |
"displayName": notebookutils.runtime.context['currentWorkspaceName'] | |
}] | |
else: | |
for scoped_workspace in scoped_workspaces: | |
workspace_list = fabric_rest.get(path_or_url=f"/v1/workspaces/").json()['value'] | |
try: | |
uuid.UUID(scoped_workspace) | |
workspace_id = scoped_workspace | |
workspace_name = [workspace['displayName'] for workspace in workspace_list if workspace['id'] == scoped_workspace][0] | |
except: | |
workspace_name = scoped_workspace | |
workspace_id = [workspace['id'] for workspace in workspace_list if workspace['displayName'] == scoped_workspace][0] | |
workspaces.append({ | |
'id': workspace_id, | |
'displayName': workspace_name | |
}) | |
lakehouses = [] | |
for workspace in workspaces: | |
lakehouses_in_workspace = fabric_rest.get(path_or_url=f"/v1/workspaces/{workspace['id']}/lakehouses").json() | |
try: | |
lakehouses.extend(lakehouses_in_workspace['value']) | |
except: | |
continue | |
onelake_domain = lakehouses[0]['properties']['oneLakeTablesPath'].split('//')[1].split('/')[0] | |
# get datasets in scope | |
datasets = [] | |
for workspace in workspaces: | |
dataset_list = fabric_rest.get(path_or_url=f"/v1/workspaces/{workspace['id']}/semanticModels/").json()['value'] | |
dataset_list = [dict(item, **{'workspaceName':workspace['displayName']}) for item in dataset_list] | |
datasets.extend(dataset_list) | |
# map direct lake tables to lakehouse | |
data = [] | |
for dataset in datasets: | |
dataset_id = dataset['id'] | |
dataset_name = dataset['displayName'] | |
workspace_id = dataset['workspaceId'] | |
workspace_name = dataset['workspaceName'] | |
partitions_df = fabric.list_partitions(workspace=workspace_id, dataset=dataset_id) | |
partitions_df = partitions_df[partitions_df['Mode'] == 'DirectLake'] | |
if len(partitions_df) > 0: | |
expressions_df = fabric.list_expressions(dataset = dataset_id, workspace = workspace_id) | |
expr = expressions_df[expressions_df['Name'] == 'DatabaseQuery']['Expression'].iloc[0] | |
sql_endpoint_id = re.findall(r'"([^"]*)"', expr)[1] | |
try: | |
tables_df = fabric.list_tables(workspace=workspace_id, dataset=dataset_id, additional_xmla_properties=['SourceLineageTag']) | |
if not tables_df.empty and not partitions_df.empty: | |
tables_df['lakehouse_name'] = [lh['displayName'] for lh in lakehouses if lh['properties']['sqlEndpointProperties']['id'] == sql_endpoint_id][0] | |
tables_df['lakehouse_id'] = [lh['id'] for lh in lakehouses if lh['properties']['sqlEndpointProperties']['id'] == sql_endpoint_id][0] | |
tables_df['schema_enabled'] = any( | |
lh['properties'].get('defaultSchema') is not None | |
for lh in lakehouses | |
if lh['properties']['sqlEndpointProperties']['id'] == sql_endpoint_id | |
) | |
tables_df['schema_name'] = tables_df['Source Lineage Tag'].str.replace('[', '', regex=False).str.replace(']', '', regex=False).str.split('.').str[0] | |
tables_df['table_name'] = tables_df['Source Lineage Tag'].str.replace('[', '', regex=False).str.replace(']', '', regex=False).str.split('.').str[1] | |
table_details_df = pd.merge( | |
tables_df, | |
partitions_df, | |
how='inner', | |
left_on='Name', | |
right_on='Table Name' | |
)[['lakehouse_id','lakehouse_name', 'schema_name', 'table_name', 'schema_enabled']] | |
if not table_details_df.empty: | |
table_details_df['workspace_id'] = workspace_id | |
table_details_df['workspace_name'] = workspace_name | |
table_details_df['directlake_models'] = [ | |
[{"dataset_id": dataset_id, "dataset_name": dataset_name}] | |
] * len(table_details_df) | |
data.append(table_details_df) | |
except Exception as e: | |
continue | |
final_df = pd.concat(data, ignore_index=True).drop_duplicates(subset=['lakehouse_id', 'lakehouse_name', 'schema_name', 'table_name']) | |
# attempt to connect to delta table since auth fails the first time | |
@retry(stop=stop_after_attempt(2)) | |
def establish_connection(): | |
table = final_df.iloc[0] | |
if table['schema_enabled']: | |
table_path = f"abfss://{table['workspace_name']}@{onelake_domain}/{table['lakehouse_name']}.Lakehouse/Tables/{table['schema_name']}/{table['table_name']}/" | |
else: | |
table_path = f"abfss://{table['workspace_name']}@{onelake_domain}/{table['lakehouse_name']}.Lakehouse/Tables/{table['table_name']}/" | |
DeltaTable(table_path) | |
establish_connection() | |
# apply vorder to scoped tables | |
def process_table(table): | |
try: | |
full_table_name = f"`{table.lakehouse_name}`.`{table.schema_name}`.`{table.table_name}`" | |
print(f"Processing {full_table_name}") | |
if table.schema_enabled: | |
table_path = f"abfss://{table.workspace_name}@{onelake_domain}/{table.lakehouse_name}.Lakehouse/Tables/{table.schema_name}/{table.table_name}/" | |
else: | |
table_path = f"abfss://{table.workspace_name}@{onelake_domain}/{table.lakehouse_name}.Lakehouse/Tables/{table.table_name}/" | |
delta_table = DeltaTable(table_path) | |
properties = delta_table.metadata().configuration | |
vorder_enabled = properties.get('delta.parquet.vorder.enabled', 'false').lower() == 'true' | |
if vorder_enabled: | |
return f"V-Order already enabled for {full_table_name}" | |
else: | |
delta_table.alter.set_table_properties({"delta.parquet.vorder.enabled": "true"}, raise_if_not_exists=False) | |
return f"Set 'delta.parquet.vorder.enabled' to 'true' for {full_table_name}" | |
except Exception as e: | |
return f"Error processing {full_table_name}: {e}" | |
with ThreadPoolExecutor() as executor: | |
future_to_table = {executor.submit(process_table, table): table for _, table in final_df.iterrows()} | |
for future in as_completed(future_to_table): | |
result = future.result() | |
print(result) | |
# execute function, this can take up to a minute to run for a couple workspaces that have a couple small Direct Lake Semantic Models. | |
enable_vorder_on_direct_lake_tables(scoped_workspaces=['workspace_1', 'workspace_2']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment