Skip to content

Instantly share code, notes, and snippets.

@ojmccall
Last active September 15, 2022 12:22
Show Gist options
  • Save ojmccall/f994be414d67b10c1878e62eb8004045 to your computer and use it in GitHub Desktop.
Save ojmccall/f994be414d67b10c1878e62eb8004045 to your computer and use it in GitHub Desktop.
Slack DE builder.py
def sfmc_api(event,context):
import requests
import json
from google.cloud import storage
from time import gmtime, strftime
import pandas as pd
import xml.etree.ElementTree as ET
import ast
import sys
#I use this to store my token value and API credentials via Secrets Manager
from os import environ
client_id = ""
client_secret = ""
baseURL = ""
slackURL = ""
triggerfilename = format(event["name"])
bucket = format(event["bucket"])
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(triggerfilename)
blob =blob.download_as_string()
body = ast.literal_eval(blob.decode("utf-8"))
print(body)
token = environ["SlackBot_Token"]
token = token.replace("\n","")
print(token)
if "text" in str(body):
slackData = body["event"]["user"]
text = body["event"]["text"]
file_id = body["event"]["files"][0]["id"]
url = f"https://slack.com/api/files.info?file={file_id}"
r = requests.get(url,headers={"Authorization": "Bearer %s" % token})
r.raise_for_status
response = r.json()
print(r.content)
assert response["ok"]
filename = response["file"]["name"]
file_url = response["file"]["url_private"]
file_count = 1
if "Data Extension" not in text:
sys.exit()
if "Data Extension" in text and file_count > 0:
messageBody = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"SFMC Data Extension Creation Request"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"\n*Thanks for your submission to create a Data Extension* <@{slackData}>"
}
},
{
"type": "section",
"text":
{
"type": "mrkdwn",
"text": "\n I'll get back to you in a minute or 2 after I've processed your file :eyes:"
}
},
{
"type": "divider"
}
]
}
url =
header = {"Content-Type":"application/json"}
post = requests.post(url, json = messageBody ,headers = header)
body = post.content
status = post.status_code
#download slack file to temp memory
r = requests.get(file_url, headers={"Authorization": "Bearer %s" % token})
file_data = r.content # get binary content
fullpath= "/tmp/file.csv"
# save file to disk
with open(fullpath , "w+b") as f:
f.write(bytearray(file_data))
#Find DE data in filename
var = filename[5:]
var = var.replace(".csv","")
split =var.split("_")
DEName = split[0]
if len(split)>1:
FolderID = split[1]
else:
FolderID = ""
if len(FolderID) >0:
Folder = """<CategoryID>""" + FolderID + """</CategoryID> """
else:
Folder = ""
#get csv values to create DE
df = pd.read_csv(fullpath,dtype="unicode")
df = df.fillna("")
df_dict = df.to_dict(orient="records")
print("df_array: "+str(df_dict))
print("length: "+str(len(df_dict)))
fields = []
for column in df:
fieldname = column
fieldtype = df.at[0,fieldname]
fieldlength = str(df.at[1,fieldname])
isprimarykey = str(df.at[2,fieldname])
isrequired = str(df.at[3,fieldname])
fieldscale = str(df.at[4,fieldname])
defaultvalue = str(df.at[5,fieldname])
SOAPField = "<Field>"
SOAPField +="<Name>"+fieldname+"</Name>"
SOAPField += "<FieldType>"+fieldtype+"</FieldType>"
if fieldtype == "Text" or fieldtype == "Decimal":
SOAPField += "<MaxLength>"+fieldlength+"</MaxLength>"
if len(isrequired) >0:
SOAPField +="<IsRequired>"+isrequired+"</IsRequired>"
if len(isprimarykey) >0:
SOAPField += "<IsPrimaryKey>"+isprimarykey+"</IsPrimaryKey>"
if len(fieldscale) >0:
SOAPField += "<Scale>"+fieldscale+"</Scale>"
if len(defaultvalue) >0:
SOAPField += "<DefaultValue>"+defaultvalue+"</DefaultValue>"
SOAPField += "</Field>"
fields.append(SOAPField)
Fields = " ".join([str(elem) for elem in fields])
print("string fields: "+Fields)
#remove DE config rows from data
datarows = df_dict[6:len(df_dict)]
#Set Values for DE structure
SOAPEndpoint = "https://"+baseURL+".soap.marketingcloudapis.com/Service.asmx"
#get token
url = "https://"+baseURL+".auth.marketingcloudapis.com/v2/Token"
data = {"grant_type":"client_credentials",
"client_id":client_id,
"client_secret":client_secret
}
r = requests.post(url, data=data)
print("posted")
body = json.loads(r.content)
token = body["access_token"]
TokenStatus = r.status_code
if TokenStatus != 200:
Failure = 1
StatusMessage = "token request invalid"
else:
Failure = 0
#compile XML
XMLString = ("""<?xml version="1.0" encoding="UTF-8"?><s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:u="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"><s:Header> <a:Action s:mustUnderstand="1">Create</a:Action><a:To s:mustUnderstand="1">"""+SOAPEndpoint+"""</a:To><fueloauth xmlns="http://exacttarget.com">"""+token+"""</fueloauth></s:Header><s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><CreateRequest xmlns="http://exacttarget.com/wsdl/partnerAPI"><Options/><Objects xsi:type="DataExtension"><Name> """ + DEName +"""</Name> <CustomerKey>""" + DEName + """</CustomerKey>"""+Folder+"""<IsSendable>false</IsSendable> <Fields>""" + Fields + """</Fields></Objects> </CreateRequest> </s:Body> </s:Envelope>""")
XML = bytes(XMLString,"utf-8")
print("XML here: "+str(XML))
#Create DE
header = {"Content-Type": "text/xml" }
post = requests.post(SOAPEndpoint, data = XML ,headers=header)
print("posted")
body = post.content
XMLString = body.decode("utf-8")
print("DE post"+ str(XMLString))
root = ET.fromstring(XMLString)
for elem in root.iter():
if "StatusMessage" in elem.tag:
StatusMessage = elem.text
print(StatusMessage)
if "StatusCode" in elem.tag:
StatusCode = elem.text
print(StatusCode)
if "RequestID" in elem.tag:
#print(elem.text)
RequestId = elem.text
print(RequestId)
if StatusCode != "OK":
Failure = 1
else:
Failure = 0
if Failure != 1:
# IF DE created then post Data to DE via Async API
if len(datarows) >0:
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
for group in chunker(datarows, 1000):
#print(group)
payloadraw = {"items":group}
payload = json.dumps(payloadraw)
print(payloadraw)
head = {"Authorization": "Bearer " + token}
url = "https://"+baseURL+".rest.marketingcloudapis.com/data/v1/async/dataextensions/key:"+DEName+"/rows"
response = requests.post(url, json=payloadraw, headers=head)
#Send response back to slack based on DE build response
if Failure ==1:
messageBody = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"SFMC DE Creation Request"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"\n :rotating_light: *Your DE job failed!* <@{slackData}>"
}
},
{
"type": "section",
"text":
{
"type": "mrkdwn",
"text": f"\n There was an error,\n your Data Extension: *{DEName}* wasnt built or there was an issue with the data inside it"
}
},
{
"type": "divider"
}
]
}
else:
messageBody = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"SFMC Data Extension Creation Request"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"\n :tada: *Your Data Extension is ready* <@{slackData}>"
}
},
{
"type": "section",
"text":
{
"type": "mrkdwn",
"text": f"\n Thanks for choosing Oscar's DE builder,\n your Data Extension: *{DEName}* is ready to go!"
}
},
{
"type": "divider"
}
]
}
url = slackURL
header = {"Content-Type":"application/json"}
post = requests.post(url, json = messageBody ,headers = header)
@matheswarwan
Copy link

Ah - makes sense. Missed that. Thanks for pointing it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment