Check out the below example User data functions for the following Translytical task flow demo scenarios:
- Modify data record(s)
- Update Discount: Set a new discount value
- Data annotation
- Add Annotation: Add a new datapoint annotation
- Edit Annotation: Update an exisiting annotation
- Delete Annotation: Delete a specific annotation
- Augment data on the fly
- Get Contact Info: Get latest contact info data for a registered partner company
- Dynamic notifications
- Dynamic Marketing Email: For accepted offers send a dynamic marketing email to company's point of contact
- Approval workflows
- Request Discount: Request for an admin to approve a discount
- Approve Discount: Admin can apply the discount
- Custom AI integration
- AI Suggestions: AI suggesitions for collab proposals
Fabric User data functions provide built-in data source connection management for the following sources:
- Fabric SQL Database for read/write operations
- Fabric warehouses for read/write operations
- Fabric lakehouses for read/write operations for Lakehouse files and for read-only operations for the SQL Endpoint
- Fabric mirrored databases for read-only operations
The writeback examples you see here are leveraging a SQL Database in Fabric, which is ideal for heavy read/write operations. The report for these demos uses a DirectQuery connection to this Fabric SQL database so that data is updated immediately in the report after the function executes.
import fabric.functions as fn
import logging
udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB", alias="Translytical")
@udf.function()
def UpdateDiscount(sqlDB: fn.FabricSqlConnection, quantity: int, risklevel: str, dealexpiration: int, discount: float)->str :
logging.info('Python UDF trigger function processed a request.')
sqlConnection = sqlDB.connect()
cursor = sqlConnection.cursor()
if (discount < 0):
raise fn.UserThrownError("Discount cannot be negative")
if (discount > 1):
discount = discount / 100
if (discount > 0.50):
raise fn.UserThrownError("Discount cannot exceed 50%")
risk = get_risk(risklevel)
if risk == 0:
return f"This opportunity cannot be changed since it is either in Won or Lost state."
days_to_close_sql = get_days_to_close_sql_string(dealexpiration)
query = f"UPDATE [dbo].[Opportunity] SET [Discount] = ? WHERE Rating = ? {days_to_close_sql} AND Quantity = ?"
params = (discount, risk, quantity)
# Convert the query to a properly formatted T-SQL statement
tsql_query = query.replace("?", "{}").format(*[f"'{p}'" if isinstance(p, str) else p for p in params])
# Print the actual T-SQL statement
logging.info(f"Executing SQL Query: {tsql_query}\n")
cursor.execute(query, params)
sqlConnection.commit()
sqlConnection.close()
return f"Opportunities with {risklevel} are updated."
def get_risk(risklevel:str)->int:
match risklevel:
case "High risk":
return 1
case "Medium risk":
return 2
case "Low risk":
return 3
case _:
return 0
def get_days_to_close_sql_string(dealexpiration: int)->str:
match dealexpiration:
case 60:
return "AND Days_To_Close <= 69"
case 300:
return "AND Days_To_Close >= 300"
case _:
return f"AND Days_To_Close >= {dealexpiration} AND Days_To_Close <= {dealexpiration + 9}"
import fabric.functions as fn
import logging
import calendar
udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def AddAnnotation(sqlDB: fn.FabricSqlConnection, date: str, commentdate: str, comment: str, user: str) -> str:
logging.info('Python UDF trigger function processed a request.')
# month abbr to YYYY-MM-DD
month_num = list(calendar.month_abbr).index(date)
formatted_date = f"{2023}-{month_num:02d}-10"
data = (commentdate, formatted_date, user, comment)
# Establish a connection to the SQL database
connection = sqlDB.connect()
cursor = connection.cursor()
logging.info("Adding comment ... ")
# Insert data into the table
insert_query = "INSERT INTO [dbo].[DataReasoning] ([Date_Created],[Date_Month],[User],[Comment]) VALUES (?, ?, ?, ?);"
cursor.execute(insert_query, data)
logging.info("Comment was added")
# Commit the transaction
connection.commit()
# Close the connection
cursor.close()
connection.close()
return "Comment was successfully added"
import fabric.functions as fn
import logging
import calendar
udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def EditAnnotation(sqlDB: fn.FabricSqlConnection, comment: str, commentdate: str, user: str, newcomment :str ) -> str:
logging.info('Python UDF trigger function processed a request.')
data = (newcomment, commentdate, user, comment)
# Establish a connection to the SQL database
connection = sqlDB.connect()
cursor = connection.cursor()
# Insert data into the table
logging.info("Updating comment")
update_query = " UPDATE [dbo].[DataReasoning] SET [Comment] = ?, [Date_Created] = ?, [User] = ? WHERE [Comment] = ?;"
cursor.execute(update_query, data)
logging.info("Comment was updated")
# Commit the transaction
connection.commit()
# Close the connection
cursor.close()
connection.close()
return "Comment was successfully updated"
import fabric.functions as fn
import logging
udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def DeleteAnnotation(sqlDB: fn.FabricSqlConnection, comment: str ) -> str:
logging.info('Python UDF trigger function processed a request.')
# Establish a connection to the SQL database
connection = sqlDB.connect()
cursor = connection.cursor()
# Delete comment
logging.info("Deleting comment ... ")
delete_query = "DELETE FROM [dbo].[DataReasoning] WHERE [Comment] = ?"
cursor.execute(delete_query,comment)
logging.info("Comment was deleted")
# Commit the transaction
connection.commit()
# Close the connection
cursor.close()
connection.close()
return "Comment was successfully deleted"
import fabric.functions as fn
import logging
import requests
udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def GetContactInfo(sqlDB: fn.FabricSqlConnection, company: str, status: str, date:str, comment: str) -> str:
logging.info('Python UDF trigger function processed a request.')
# Error handling for no status selected or no company input
if(status=="" or len(status) < 1):
raise fn.UserThrownError("The status isn't valid.", {"status:": status})
if(company=="" or len(company) < 1):
raise fn.UserThrownError("The company isn't valid.", {"company:": company})
#Call External API to get contact info within registered partner company
url = "https://dummy-json.mock.beeceptor.com/users"
response = requests.get(url)
if response.status_code == 200:
Jsondata = response.json()
for i in Jsondata:
#Check if company contact is registered in the external system
if i['company'] == company:
# Combine timestamp with comment
comment_value = date + " - " + comment
# Get customer contact and put in format based on if updating an record or adding new record
Existing_Partner_data = (i['name'], i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value, company)
New_Partner_data = (i['name'], company, i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value)
# Establish a connection to the SQL database
connection = sqlDB.connect()
cursor = connection.cursor()
#Check if there is a status record for the company
SQL_read_Command = "SELECT * FROM [dbo].[CompanyStatus] WHERE Company = ?"
cursor.execute(SQL_read_Command, company)
if cursor.fetchone():
#if there is a status record for the company, update the status and contact info
SQL_update_command = "UPDATE [dbo].[CompanyStatus] SET [Name] = ?, [Username] = ?, [Email] = ?, [Address] = ?, [Zip] = ?, [State] = ?, [Country] = ?, [Phone] = ?, [Status] = ?, [Comment] = ? WHERE [Company] = ?;"
cursor.execute(SQL_update_command, Existing_Partner_data)
else:
#if there is not a status record for the company, add new record
SQL_insert_command = "INSERT INTO [dbo].[CompanyStatus](Name, Company, Username, Email, Address, Zip, State, Country, Phone, Status, Comment) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
cursor.execute(SQL_insert_command, New_Partner_data)
# Commit the transaction
connection.commit()
# Close the connection
cursor.close()
connection.close()
return "Collab with " + company + " is now " + status.lower() + "."
#if reached here in the code, then the company contact is NOT registered in the external system, throw error
raise fn.UserThrownError("The company is not a registered partner.", {"company:": company})
import fabric.functions as fn
import logging
import requests
from flask import Flask
from flask_mail import Mail, Message
udf = fn.UserDataFunctions()
app = Flask(__name__)
mail = Mail(app) # instantiate the mail class
# configuration of mail
app.config['MAIL_SERVER']='smtp.sendgrid.net'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = 'apikey'
app.config['MAIL_PASSWORD'] = '<your API key here>'
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_DEFAULT_SENDER'] = '<your email here>'
mail = Mail(app)
html_email = """ <Your HTML email here>
"""
@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def get_data_write_to_sql_db_send_email(sqlDB: fn.FabricSqlConnection, company: str, status: str, date:str, comment: str) -> str:
logging.info('Python UDF trigger function processed a request.')
# Error handling for no status selected or no company input
if(status=="" or len(status) < 1):
raise fn.UserThrownError("The status isn't valid.", {"status:": status})
if(company=="" or len(company) < 1):
raise fn.UserThrownError("The company isn't valid.", {"company:": company})
#Call External API to get Company contact information
url = "https://dummy-json.mock.beeceptor.com/users"
response = requests.get(url)
if response.status_code == 200:
Jsondata = response.json()
for i in Jsondata:
#Check if company contact is registered in the external system
if i['company'] == company:
# Combine timestamp with comment
comment_value = date + " - " + comment
# Get customer contact and put in format based on if updating an record or adding new record
Existing_Partner_data = (i['name'], i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value, company)
New_Partner_data = (i['name'], company, i['username'], i['email'], i['address'], i['zip'], i['state'], i['country'], i['phone'], status, comment_value)
# Establish a connection to the SQL database
connection = sqlDB.connect()
cursor = connection.cursor()
#Check if there is a status record for the company
SQL_read_Command = "SELECT * FROM [dbo].[CompanyStatus] WHERE Company = ?"
cursor.execute(SQL_read_Command, company)
if cursor.fetchone():
#if there is a status record for the company, update the status and contact info
SQL_update_command = "UPDATE [dbo].[CompanyStatus] SET [Name] = ?, [Username] = ?, [Email] = ?, [Address] = ?, [Zip] = ?, [State] = ?, [Country] = ?, [Phone] = ?, [Status] = ?, [Comment] = ? WHERE [Company] = ?;"
cursor.execute(SQL_update_command, Existing_Partner_data)
else:
#if there is not a status record for the company, add new record
SQL_insert_command = "INSERT INTO [dbo].[CompanyStatus](Name, Company, Username, Email, Address, Zip, State, Country, Phone, Status, Comment) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
cursor.execute(SQL_insert_command, New_Partner_data)
# Commit the transaction
connection.commit()
# Close the connection
cursor.close()
connection.close()
#If status is set to accepted, send an email to company contact
#Note: this is hard-coded to a temp working email for demo purposes
if(status == "Accepted"):
with app.app_context():
msg = Message (company + " x Contoso collab time!",
recipients = ['<Email here>'] #I put a static temp email here but you can make this dynamic
)
msg.html = html_email
mail.send(msg)
return "Accepted collab offer with " + company + ", and " + i['name'] + " will be notified shortly."
return "Collab with " + company + " is now " + status.lower() + "."
#if reached here in the code, then the company contact is NOT registered in the external system, throw error
raise fn.UserThrownError("The company is not a registered partner.", {"company:": company})
import fabric.functions as fn
import logging
import requests
# Initialize the UserDataFunctions
udf = fn.UserDataFunctions()
#This function is designed for sending an approval request message to a specific Microsoft Teams channel using the Microsoft Graph API
@udf.function()
def RequestDiscount(user: str, comment: str, discount: float, revenue: float, URLfilter: str) -> str :
logging.info('Python UDF trigger function processed a request.')
try:
# Define the endpoint URL for Microsoft Graph API to send a message to Teams
team_id = "<team id>" # Replace with your actual team ID
channel_id = "<channel id>" # Replace with your actual channel ID
url = f"https://graph.microsoft.com/v1.0/teams/{team_id}/channels/{channel_id}/messages"
# Define the headers including the authorization token
headers = {
"Authorization": "<auth token here>",
"Content-Type": "application/json"
}
Discountpercent = str(discount) + "%"
Discountdollars = '${:,.2f}'.format(((discount/100) * revenue))
ApprovalURL = "<Report URL here>" + URLfilter + "%20and%20DiscountValues%2FValues%20eq%20" + str(discount) #add your report URL here
# Define the message body with HTML links
#I only configured an approval flow for demo purposes. You will need to add reject action as well or remove this option.
message_body = f"""
<div>
<strong>Discount Request from {user}</strong><br>
<p>Discount percentage: {Discountpercent}</p>
<p>Discount total: {Discountdollars}</p>
<p>Comment: {comment}</p>
<p>
<a href={ApprovalURL} target="_blank">Approve</a> |
<a href="https://example.com/reject" target="_blank">Reject</a>
</p>
</div>
"""
# Define the payload
payload = {
"body": {
"content": message_body,
"contentType": "html"
}
}
# Make the POST request to the Microsoft Graph API
response = requests.post(url, headers=headers, json=payload)
# Check if the request was successful
if response.status_code == 201:
return "Approval request successfully posted in Teams."
else:
raise fn.UserThrownError("Failed to post approval request.", {"Status code": response.status_code}, {"Response": response.text})
except Exception as e:
raise fn.UserThrownError("We ran into an issue.", {"Error:": str(e)})
import fabric.functions as fn
import logging
udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB", alias="Translytical")
@udf.function()
def ApproveDiscount(sqlDB: fn.FabricSqlConnection, opportunities: str, discountinput: float, comment: str)->str :
logging.info('Python UDF trigger function processed a request.')
connection = sqlDB.connect()
cursor = connection.cursor()
if (discount < 0):
raise fn.UserThrownError("Discount cannot be negative")
if (discount > 1):
discount = discount / 100
if (discount > 0.50):
raise fn.UserThrownError("Discount cannot exceed 50%")
# SQL update command
SQL_update_command = "UPDATE [dbo].[Opportunity] SET [Discount] = " + str(discount) + " WHERE Opportunity_Number IN (" + opportunities + ")"
cursor.execute(SQL_update_command)
# Commit the transaction
connection.commit()
# Close the connection
cursor.close()
connection.close()
try:
# Define the endpoint URL for Microsoft Graph API to send a message to Teams
team_id = "<team id>" # Replace with your actual team ID
channel_id = "<channel id>" # Replace with your actual channel ID
url = f"https://graph.microsoft.com/v1.0/teams/{team_id}/channels/{channel_id}/messages"
# Define the headers including the authorization token
headers = {
"Authorization": "<auth token here>",
"Content-Type": "application/json"
}
# Define the message body with comment
message_body = f"""
<div>
<strong>Approved Discount Request</strong><br>
<p>Comment: {comment}</p>
</div>
"""
# Define the payload
payload = {
"body": {
"content": message_body,
"contentType": "html"
}
}
# Make the POST request to the Microsoft Graph API
response = requests.post(url, headers=headers, json=payload)
# Check if the request was successful
if response.status_code == 201:
return "Discount has been approved and applied. Team has been notified."
else:
raise fn.UserThrownError("Failed to post approval request.", {"Status code": response.status_code}, {"Response": response.text})
except Exception as e:
raise fn.UserThrownError("We ran into an issue.", {"Error:": str(e)})
import fabric.functions as fn
import logging
import openai
udf = fn.UserDataFunctions()
@udf.connection(argName="sqlDB",alias="Translytical")
@udf.function()
def AISuggestion(sqlDB: fn.FabricSqlConnection, company: str) -> str :
logging.info('Python UDF trigger function processed a request.')
# Establish a connection to the SQL database
connection = sqlDB.connect()
cursor = connection.cursor()
#Get offer status for the company
SQL_read_Command = "SELECT * FROM [dbo].[CompanyStatus] WHERE Company = ?"
cursor.execute(SQL_read_Command, company)
record = cursor.fetchone()
customer_name = record[0]
last_comment = record[10]
#Submit prompt to Azure OpenAI and get an AI suggestion
prompt = "Respond with a short plan that is under 240 characters: I work at Contoso Outdoors, and we collaborate with influencers by offering them offers for custom designed bikes. Pretend we want to collab with the following influencer: " + customer_name +" from company: " + company + ". Here's a comment about their latest feedback " + last_comment + "."
deployment = "gpt-4o"
openai_client = openai.AzureOpenAI(
api_key='<API Key here>',
api_version = "2025-01-01-preview",
azure_endpoint = "https://sico-oai-eus2.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview"
)
response = openai_client.chat.completions.create(
model=deployment,
messages=[
{"role": "user", "content": prompt}
]
)
result = response.choices[0].message.content
#Check if there is an exisiting AI suggestion for the company
SQL_read_Command = "SELECT * FROM [dbo].[AISuggestions] WHERE Company = ?"
cursor.execute(SQL_read_Command, company)
if cursor.fetchone():
#if there is an existing AI suggestion record for the company, update just the AI suggestion
SQL_update_command = "UPDATE [dbo].[AISuggestions] SET [AI_suggestion] = ? WHERE [Company] = ?;"
Existing_Suggestion = (result, company)
cursor.execute(SQL_update_command, Existing_Suggestion)
else:
#if there is NOT an existing AI suggestion record for the company, add a new record
SQL_insert_command = "INSERT INTO [dbo].[AISuggestions](Name, Company, AI_suggestion) VALUES(?, ?, ?);"
New_Suggestion = (customer_name, company, result)
cursor.execute(SQL_insert_command, New_Suggestion)
# Commit the transaction
connection.commit()
# Close the connection
cursor.close()
connection.close()
return f"Generated Azure OpenAI suggestion for collaboration ideas with " + customer_name + " from " + company + "."
How do you display the reponse from the function within the report in the Custom AI example?