Skip to content

Instantly share code, notes, and snippets.

@Sujata994
Last active July 10, 2025 12:31
Show Gist options
  • Save Sujata994/c354ec8d0821e875e45c86f2bd1d5cc8 to your computer and use it in GitHub Desktop.
Save Sujata994/c354ec8d0821e875e45c86f2bd1d5cc8 to your computer and use it in GitHub Desktop.
Example User data functions for Translytical task flows

Translytical task flows examples

Example User data functions

Check out the below example User data functions for the following Translytical task flow demo scenarios:

Additional information about Translytical task flows

Fabric User data functions provide built-in data source connection management for the following sources:

The writeback examples you see here are leveraging a SQL Database in Fabric, which is ideal for heavy read/write operations. The report for these demos uses a DirectQuery connection to this Fabric SQL database so that data is updated immediately in the report after the function executes.

Modify data record(s)

import fabric.functions as fn
import logging

udf = fn.UserDataFunctions()

@udf.connection(argName="sqlDB", alias="Translytical")
@udf.function()
def UpdateDiscount(sqlDB: fn.FabricSqlConnection, quantity: int, risklevel: str, dealexpiration: int,  discount: float)->str :
    
    logging.info('Python UDF trigger function processed a request.')

    sqlConnection = sqlDB.connect()
    cursor = sqlConnection.cursor()

    if (discount < 0):
        raise fn.UserThrownError("Discount cannot be negative")
    if (discount > 1):
        discount = discount / 100
    if (discount > 0.50):
        raise fn.UserThrownError("Discount cannot exceed 50%")

    risk = get_risk(risklevel)

    if risk == 0: 
        return f"This opportunity cannot be changed since it is either in Won or Lost state."

    days_to_close_sql = get_days_to_close_sql_string(dealexpiration)
    query = f"UPDATE [dbo].[Opportunity] SET [Discount] = ? WHERE Rating = ? {days_to_close_sql} AND Quantity = ?"
    params = (discount, risk, quantity)
    
    # Convert the query to a properly formatted T-SQL statement
    tsql_query = query.replace("?", "{}").format(*[f"'{p}'" if isinstance(p, str) else p for p in params])

    # Print the actual T-SQL statement
    logging.info(f"Executing SQL Query: {tsql_query}\n")

    cursor.execute(query, params)
    
    sqlConnection.commit()
    sqlConnection.close()
    return f"Opportunities with {risklevel} are updated."


def get_risk(risklevel:str)->int:
    match risklevel:
        case "High risk":
            return 1
        case "Medium risk":
            return 2
        case "Low risk":
            return 3
        case _:
            return 0

def get_days_to_close_sql_string(dealexpiration: int)->str:
    match dealexpiration:
        case 60:
            return "AND Days_To_Close <= 69"
        case 300:
            return "AND Days_To_Close >= 300"
        case _:
            return f"AND Days_To_Close >= {dealexpiration} AND Days_To_Close <= {dealexpiration + 9}"

Data annotation

import fabric.functions as fn
import logging
import calendar

udf = fn.UserDataFunctions()

@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def AddAnnotation(sqlDB: fn.FabricSqlConnection, date: str, commentdate: str, comment: str, user: str) -> str:
    
    logging.info('Python UDF trigger function processed a request.') 

    # month abbr to YYYY-MM-DD
    month_num = list(calendar.month_abbr).index(date)
 
    formatted_date = f"{2023}-{month_num:02d}-10"
    data = (commentdate, formatted_date, user, comment)
 
    # Establish a connection to the SQL database
    connection = sqlDB.connect()
    cursor = connection.cursor()    
 
    logging.info("Adding comment ... ")
    # Insert data into the table
    insert_query = "INSERT INTO [dbo].[DataReasoning] ([Date_Created],[Date_Month],[User],[Comment]) VALUES (?, ?, ?, ?);"
    cursor.execute(insert_query, data)
    logging.info("Comment was added")
 
    # Commit the transaction
    connection.commit()
 
    # Close the connection
    cursor.close()
    connection.close()               
    return "Comment was successfully added"
import fabric.functions as fn
import logging
import calendar

udf = fn.UserDataFunctions()

@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def EditAnnotation(sqlDB: fn.FabricSqlConnection, comment: str, commentdate: str, user: str, newcomment :str ) -> str:

    logging.info('Python UDF trigger function processed a request.') 

    data = (newcomment, commentdate, user, comment)

    # Establish a connection to the SQL database
    connection = sqlDB.connect()
    cursor = connection.cursor()

   # Insert data into the table
    logging.info("Updating comment")
    update_query = " UPDATE [dbo].[DataReasoning] SET [Comment] = ?, [Date_Created] = ?, [User] = ? WHERE [Comment] = ?;"
    cursor.execute(update_query, data)
    logging.info("Comment was updated")

    # Commit the transaction
    connection.commit()

    # Close the connection
    cursor.close()
    connection.close()               
    return "Comment was successfully updated"
   
import fabric.functions as fn
import logging

udf = fn.UserDataFunctions()

@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def DeleteAnnotation(sqlDB: fn.FabricSqlConnection, comment: str ) -> str:
        
    logging.info('Python UDF trigger function processed a request.') 
    
    # Establish a connection to the SQL database
    connection = sqlDB.connect()
    cursor = connection.cursor()
    
    # Delete comment 
    logging.info("Deleting comment ... ")
    delete_query = "DELETE FROM [dbo].[DataReasoning] WHERE [Comment] = ?"
    cursor.execute(delete_query,comment)
    logging.info("Comment was deleted")

    # Commit the transaction
    connection.commit()

    # Close the connection
    cursor.close()
    connection.close()               
    return "Comment was successfully deleted"

Augment data on the fly

import fabric.functions as fn
import logging
import requests

udf = fn.UserDataFunctions()

@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def GetContactInfo(sqlDB: fn.FabricSqlConnection, company: str, status: str, date:str, comment: str) -> str:
    logging.info('Python UDF trigger function processed a request.')    

    # Error handling for no status selected or no company input
    if(status=="" or len(status) < 1):
        raise fn.UserThrownError("The status isn't valid.", {"status:": status})

    if(company=="" or len(company) < 1):
        raise fn.UserThrownError("The company isn't valid.", {"company:": company})             
    
    #Call External API to get contact info within registered partner company
    url = "https://dummy-json.mock.beeceptor.com/users"
    response = requests.get(url)
    
    if response.status_code == 200:
        Jsondata = response.json()
        for i in Jsondata:
            #Check if company contact is registered in the external system
            if i['company'] == company:

                # Combine timestamp with comment
                comment_value = date + " - " + comment
                
                # Get customer contact and put in format based on if updating an record or adding new record
                Existing_Partner_data = (i['name'], i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value, company)
                New_Partner_data = (i['name'], company, i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value)

                # Establish a connection to the SQL database
                connection = sqlDB.connect()
                cursor = connection.cursor()

                #Check if there is a status record for the company 
                SQL_read_Command = "SELECT * FROM [dbo].[CompanyStatus] WHERE Company = ?"
                cursor.execute(SQL_read_Command, company)

                if cursor.fetchone():
                    #if there is a status record for the company, update the status and contact info
                    SQL_update_command = "UPDATE [dbo].[CompanyStatus] SET [Name] = ?, [Username] = ?, [Email] = ?, [Address] = ?, [Zip] = ?, [State] = ?, [Country] = ?, [Phone] = ?, [Status] = ?, [Comment] =  ? WHERE [Company] = ?;"
                    cursor.execute(SQL_update_command, Existing_Partner_data)
                    
                else:
                    #if there is not a status record for the company, add new record
                    SQL_insert_command = "INSERT INTO [dbo].[CompanyStatus](Name, Company, Username, Email, Address, Zip, State, Country, Phone, Status, Comment) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
                    cursor.execute(SQL_insert_command, New_Partner_data)
            
                # Commit the transaction
                connection.commit()

                # Close the connection
                cursor.close()
                connection.close()

                return "Collab with " + company + " is now " + status.lower() + "."

        #if reached here in the code, then the company contact is NOT registered in the external system, throw error   
        raise fn.UserThrownError("The company is not a registered partner.", {"company:": company})       
   

Dynamic notifications

import fabric.functions as fn
import logging
import requests
from flask import Flask
from flask_mail import Mail, Message

udf = fn.UserDataFunctions()
app = Flask(__name__) 
mail = Mail(app) # instantiate the mail class 

# configuration of mail 
app.config['MAIL_SERVER']='smtp.sendgrid.net'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = 'apikey'
app.config['MAIL_PASSWORD'] = '<your API key here>'
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_DEFAULT_SENDER'] = '<your email here>'
mail = Mail(app) 

html_email = """ <Your HTML email here>
"""

@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()

def get_data_write_to_sql_db_send_email(sqlDB: fn.FabricSqlConnection, company: str, status: str, date:str, comment: str) -> str:
    logging.info('Python UDF trigger function processed a request.')    

    # Error handling for no status selected or no company input
    if(status=="" or len(status) < 1):
        raise fn.UserThrownError("The status isn't valid.", {"status:": status})

    if(company=="" or len(company) < 1):
        raise fn.UserThrownError("The company isn't valid.", {"company:": company})          
    
    #Call External API to get Company contact information
    url = "https://dummy-json.mock.beeceptor.com/users"
    response = requests.get(url)

    if response.status_code == 200:
        Jsondata = response.json()

        for i in Jsondata:
            #Check if company contact is registered in the external system
            if i['company'] == company:

                # Combine timestamp with comment
                comment_value = date + " - " + comment

                # Get customer contact and put in format based on if updating an record or adding new record
                Existing_Partner_data = (i['name'], i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value, company)
                New_Partner_data = (i['name'], company, i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value)

                # Establish a connection to the SQL database
                connection = sqlDB.connect()
                cursor = connection.cursor()

                #Check if there is a status record for the company 
                SQL_read_Command = "SELECT * FROM [dbo].[CompanyStatus] WHERE Company = ?"
                cursor.execute(SQL_read_Command, company)
                if cursor.fetchone():
                    #if there is a status record for the company, update the status and contact info
                    SQL_update_command = "UPDATE [dbo].[CompanyStatus] SET [Name] = ?, [Username] = ?, [Email] = ?, [Address] = ?, [Zip] = ?, [State] = ?, [Country] = ?, [Phone] = ?, [Status] = ?, [Comment] =  ? WHERE [Company] = ?;"
                    cursor.execute(SQL_update_command, Existing_Partner_data)
                    
                else:
                    #if there is not a status record for the company, add new record
                    SQL_insert_command = "INSERT INTO [dbo].[CompanyStatus](Name, Company, Username, Email, Address, Zip, State, Country, Phone, Status, Comment) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
                    cursor.execute(SQL_insert_command, New_Partner_data)
            
                # Commit the transaction
                connection.commit()

                # Close the connection
                cursor.close()
                connection.close()
                
                #If status is set to accepted, send an email to company contact
                #Note: this is hard-coded to a temp working email for demo purposes
                if(status == "Accepted"):
                  with app.app_context():
                      msg = Message (company + " x Contoso collab time!",
                                      recipients = ['<Email here>'] #I put a static temp email here but you can make this dynamic
                                  )
                      msg.html = html_email
                      mail.send(msg)
                      return "Accepted collab offer with " + company + ", and " + i['name'] + " will be notified shortly."

                return "Collab with " + company + " is now " + status.lower() + "."

        #if reached here in the code, then the company contact is NOT registered in the external system, throw error   
        raise fn.UserThrownError("The company is not a registered partner.", {"company:": company})

Approval workflows

import fabric.functions as fn
import logging
import requests

# Initialize the UserDataFunctions
udf = fn.UserDataFunctions()

#This function is designed for sending an approval request message to a specific Microsoft Teams channel using the Microsoft Graph API
@udf.function()
def RequestDiscount(user: str, comment: str, discount: float, revenue: float, URLfilter: str) -> str :

    logging.info('Python UDF trigger function processed a request.')
    try:
        
        # Define the endpoint URL for Microsoft Graph API to send a message to Teams
        team_id = "<team id>"  # Replace with your actual team ID
        channel_id = "<channel id>"  # Replace with your actual channel ID
        url = f"https://graph.microsoft.com/v1.0/teams/{team_id}/channels/{channel_id}/messages"
         

        # Define the headers including the authorization token
        headers = {
            "Authorization": "<auth token here>",  
            "Content-Type": "application/json"
        }
          

        Discountpercent = str(discount) + "%"
        Discountdollars = '${:,.2f}'.format(((discount/100) * revenue))
       
        ApprovalURL = "<Report URL here>" + URLfilter + "%20and%20DiscountValues%2FValues%20eq%20" + str(discount) #add your report URL here
        
        # Define the message body with HTML links
        #I only configured an approval flow for demo purposes. You will need to add reject action as well or remove this option.
        message_body = f"""
            <div>
                <strong>Discount Request from {user}</strong><br>
                <p>Discount percentage: {Discountpercent}</p> 
                <p>Discount total: {Discountdollars}</p>
                <p>Comment: {comment}</p>
                <p>
                    <a href={ApprovalURL} target="_blank">Approve</a>  |   
                    <a href="https://example.com/reject" target="_blank">Reject</a> 
                </p>
            </div>
        """

        # Define the payload
        payload = {
            "body": {
                "content": message_body,
                "contentType": "html"
            }
        }
        
        # Make the POST request to the Microsoft Graph API
        response = requests.post(url, headers=headers, json=payload)
      
        # Check if the request was successful
        if response.status_code == 201:
            return "Approval request successfully posted in Teams."
        else:
            raise fn.UserThrownError("Failed to post approval request.", {"Status code": response.status_code}, {"Response": response.text})  
            
    except Exception as e:
        raise fn.UserThrownError("We ran into an issue.", {"Error:": str(e)})
import fabric.functions as fn
import logging

udf = fn.UserDataFunctions()

@udf.connection(argName="sqlDB", alias="Translytical")
@udf.function()
def ApproveDiscount(sqlDB: fn.FabricSqlConnection, opportunities: str, discountinput: float, comment: str)->str :
     
    logging.info('Python UDF trigger function processed a request.')
    
    connection = sqlDB.connect()
    cursor = connection.cursor()

    if (discount < 0):
        raise fn.UserThrownError("Discount cannot be negative")
    if (discount > 1):
        discount = discount / 100
    if (discount > 0.50):
        raise fn.UserThrownError("Discount cannot exceed 50%")

    # SQL update command
    SQL_update_command = "UPDATE [dbo].[Opportunity] SET [Discount] = " + str(discount) + " WHERE Opportunity_Number IN (" + opportunities + ")"
    cursor.execute(SQL_update_command)
                           
    # Commit the transaction
    connection.commit()

    # Close the connection
    cursor.close()
    connection.close()
    
    try:
        # Define the endpoint URL for Microsoft Graph API to send a message to Teams
        team_id = "<team id>"  # Replace with your actual team ID
        channel_id = "<channel id>"  # Replace with your actual channel ID
        url = f"https://graph.microsoft.com/v1.0/teams/{team_id}/channels/{channel_id}/messages"
         

        # Define the headers including the authorization token
        headers = {
            "Authorization": "<auth token here>",  
            "Content-Type": "application/json"
        }
                  
  
        # Define the message body with comment
        message_body = f"""
            <div>
                <strong>Approved Discount Request</strong><br>
                <p>Comment: {comment}</p>
            </div>
        """

        # Define the payload
        payload = {
            "body": {
                "content": message_body,
                "contentType": "html"
            }
        }
        
        # Make the POST request to the Microsoft Graph API
        response = requests.post(url, headers=headers, json=payload)
      
        # Check if the request was successful
        if response.status_code == 201:
            return "Discount has been approved and applied. Team has been notified."
        else:
            raise fn.UserThrownError("Failed to post approval request.", {"Status code": response.status_code}, {"Response": response.text})  
            
    except Exception as e:
        raise fn.UserThrownError("We ran into an issue.", {"Error:": str(e)})

Custom AI integration

import fabric.functions as fn
import logging
import openai

udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def AISuggestion(sqlDB: fn.FabricSqlConnection, company: str) -> str :
    logging.info('Python UDF trigger function processed a request.')

    # Establish a connection to the SQL database
    connection = sqlDB.connect()
    cursor = connection.cursor()

    #Get offer status for the company 
    SQL_read_Command = "SELECT * FROM [dbo].[CompanyStatus] WHERE Company = ?"
    cursor.execute(SQL_read_Command, company)
    record = cursor.fetchone()    
    customer_name = record[0]
    last_comment = record[10]
    
    #Submit prompt to Azure OpenAI and get an AI suggestion
    prompt = "Respond with a short plan that is under 240 characters: I work at Contoso Outdoors, and we collaborate with influencers by offering them offers for custom designed bikes. Pretend we want to collab with the following influencer: " + customer_name +" from company: " + company + ". Here's a comment about their latest feedback " + last_comment + "."
    deployment = "gpt-4o"
    openai_client = openai.AzureOpenAI(
     api_key='<API Key here>',
     api_version = "2025-01-01-preview",
     azure_endpoint = "https://sico-oai-eus2.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview"
    )
    response = openai_client.chat.completions.create(
        model=deployment,
        messages=[
            {"role": "user", "content": prompt}
        ]
    )
    result = response.choices[0].message.content

    #Check if there is an exisiting AI suggestion for the company 
    SQL_read_Command = "SELECT * FROM [dbo].[AISuggestions] WHERE Company = ?"
    cursor.execute(SQL_read_Command, company)

    if cursor.fetchone():
        #if there is an existing AI suggestion record for the company, update just the AI suggestion
        SQL_update_command = "UPDATE [dbo].[AISuggestions] SET [AI_suggestion] = ? WHERE [Company] = ?;"
        Existing_Suggestion = (result, company)
        cursor.execute(SQL_update_command, Existing_Suggestion)
                    
    else:
        #if there is NOT an existing AI suggestion record for the company, add a new record
        SQL_insert_command = "INSERT INTO [dbo].[AISuggestions](Name, Company, AI_suggestion) VALUES(?, ?, ?);"
        New_Suggestion = (customer_name, company, result)
        cursor.execute(SQL_insert_command, New_Suggestion)
            
    # Commit the transaction
    connection.commit()
    
    # Close the connection
    cursor.close()
    connection.close()

    return f"Generated Azure OpenAI suggestion for collaboration ideas with " + customer_name + " from " + company + "."
@richbenmintz
Copy link

How do you display the reponse from the function within the report in the Custom AI example?

@Sujata994
Copy link
Author

In my example, since the response is short, I can write the response into a AISuggestions table of my SQL database. My semantic model has DirectQuery connection to this database so that I can see the result within my report.

@richbenmintz
Copy link

Thank you, would be amazing if there was a visual that not only allowed you to kick of the Fabric Data item, but also display the results.

@tanmoycuat
Copy link

Thanks for this udf's.

@OlapOffice
Copy link

Thanks for the great examples.

@elgagnon340
Copy link

Hi! Came here from the translytical task flow tutorial on Learn--do these examples here also use the AdventureWorks sample data in a Fabric SQL database?

@Sujata994
Copy link
Author

Sujata994 commented Jun 4, 2025

I'm using different sample data but these templates can be easily repurposed for another sample data or real data use case

@algo-se
Copy link

algo-se commented Jun 4, 2025

Amazing!!, it would be even better if we would have "table buttons" so that you could create/edit/delete records inside the record itself!

@Senneset
Copy link

Senneset commented Jun 5, 2025

Whats the best way to use secrets in UDFs? Is there a way to read a secret from Azure Key Vault for instance by using a Service Principal? For context I'm trying to create a udf that gets a bearer token in order to use the Admin API to perform a refresh of the dataset

@elgagnon340
Copy link

I'm using different sample data but these templates can be easily repurposed for another sample data or real data use case

Thank you for the response! Is the sample data you used available anywhere for demo purposes?

@Mukeshkumar3010
Copy link

Mukeshkumar3010 commented Jun 15, 2025

Could you share the PBIX file if possible?

@Mukeshkumar3010
Copy link

Mukeshkumar3010 commented Jun 15, 2025

We can't use it with PBIP (deployment through CI/CD Azure DevOps) as it gives error. Do you have timeline when it will be updated in schema?

@mraapshockwavemedical
Copy link

mraapshockwavemedical commented Jun 20, 2025

Whats the best way to use secrets in UDFs? Is there a way to read a secret from Azure Key Vault for instance by using a Service Principal?

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment