Skip to content

Instantly share code, notes, and snippets.

@rendicott
Created September 29, 2015 18:29
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rendicott/7a0a0630fe228e289f03 to your computer and use it in GitHub Desktop.
Save rendicott/7a0a0630fe228e289f03 to your computer and use it in GitHub Desktop.
Quick example of how to pull a report from Salesforce in CSV format via the API using Python-Requests. Then take the CSV data and parse it into an object model for sifting sorting and timedelta.
'''
GATSFA - Gateway to Salesforce API
The CSV report this is pulling down is querying the Salesforce event log.
Each line in the CSV is a Salesforce event log entry that has the prefix
of the name of our API application. So the report is a log of the
activity between the gateway API and the Salesforce API
'''
import csv
import requests
from datetime import datetime
from xml.etree import ElementTree
# first set up creds and config as globals or import them
instanceurl = 'https://na9.salesforce.com/'
report_id = '00OE0000002vZ9m'
username = 'username@company.com'
password = '(salesforceuserpassword)'
token = '(thetokenforthisuser)'
login_url = 'https://login.salesforce.com/services/Soap/u/33.0'
xml = """<?xml version="1.0" encoding="utf-8" ?>
<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
<env:Body>
<n1:login xmlns:n1="urn:partner.soap.sforce.com">
<n1:username>%s</n1:username>
<n1:password>%s%s</n1:password>
</n1:login>
</env:Body>
</env:Envelope>"""
''' define a __getitem__ method for the datetime object
This way we can choose how datetime objects are displayed
as we iterate through the list of objects and display as a table'''
class MyDatetime(datetime):
def __getitem__(self,other):
return(self.strftime('%Y%m%d-%H%M'))
class Mold():
''' Custom class that initializes itself with attributes
provided by the dictionary passed to it on init.
From there we make assumptions about what attributes exist
but we don't have to do a lot of boilerplate to define the base.
This way if the report ever comes in with more fields we don't have
to mess with our object model much.
'''
def __init__(self,dictionary):
self.containedexception = ''
for key in dictionary:
# loop though the keys in dict
# and clean them up to remove spaces etc.
key_clean = self.attr_clean(key)
# each key will be an object attribute
# all except 'exceptionmessage' which is
# too messy so we just say, "yeah there were exceptions"
if key_clean == 'exceptionmessage':
if len(dictionary.get(key)) > 0:
self.containedexception = 'yes'
else:
# set the attribute as the clean key name and the value of the dict lookup
setattr(self,key_clean,dictionary.get(key))
# now convert the 'self.requestsenttime' string to a datetime object
self.convert_timestring()
def dumpself(self):
''' Returns a list of key/value pairs in a list
'''
msgs = []
for attr in dir(self):
if ('__' not in attr and
'instancemethod' not in str(type(getattr(self,attr)))
):
msgs.append([attr,getattr(self,attr)])
return(msgs)
def dumpself_row(self,colwidth):
''' returns just the attribute values in a list
for the purpose of displaying in row format with a header
'''
row = []
for attr in dir(self):
if ('__' not in attr and
'instancemethod' not in str(type(getattr(self,attr)))
):
# only pull the number of chars to match colwidth
row.append(getattr(self,attr)[:int(colwidth)-2])
return row
def attr_clean(self,x):
# delete spaces, colons, and lowercase everything
x = x.replace(' ','')
x = x.replace(':','')
return x.lower()
def theader(self):
''' When called returns a list of the
attribute names for this object. Useful
for quickly pulling a list of strings to
be used as a table header
'''
headerlist = []
for obj in self.dumpself():
headerlist.append(obj[0])
return headerlist
def convert_timestring(self):
self.requestsenttime = MyDatetime.strptime(self.requestsenttime,'%m/%d/%Y %I:%M %p')
# now self.requestsenttime is a datetime object
# define an __eq__ and __hash__ method so we can remove duplicates with set()
# we're defining a duplicate as any event that has the same requestsenttime
def __eq__(self, other):
return self.requestsenttime==other.requestsenttime
def __hash__(self):
return hash(('requestsenttime', self.requestsenttime))
def removedupes_requestsenttime(lister):
''' Remove duplicates by considering anything with the same
requestsenttime as a unique event.
set(list) will call on the __eq__ and __hash__ methods of the
objects in the list to compare them and determine whether
or not they're a duplicate.
'''
tempset = set(lister)
lister = []
# since a set is not a list we iterate through it and make a list
for thing in tempset:
lister.append(thing)
return(lister)
def print_table(lister,top=None,colwidth=None,title=None):
if colwidth is None:
colwidth = '20'
if title is None:
title = "INSERT REPORT TITLE HERE"
# ugly way of making a formatted table
tformat_full = '{0:%s}{1:%s}{2:%s}{3:%s}{4:%s}{5:%s}{6:%s}{7:%s}' % ( colwidth,
colwidth,
colwidth,
colwidth,
colwidth,
colwidth,
colwidth,
colwidth)
if top is None:
top = len(lister)
print '\t\t\t\t\t----- ' + title + ' -----'
# pull a header from the first item in the lister
print tformat_full.format(*lister[0].theader())
for i,obj in enumerate(lister):
if i > top:
break
print tformat_full.format(*obj.dumpself_row(colwidth))
print("Number of results: " + str(len(lister)))
def proc_csv(csv_blobstring):
moldlist = []
''' use the DictReader functionality of the csv library to
parse the csv blob into a list of dictionaries which gives
it key/value pairs.
'''
listed_dictionary = list(csv.DictReader(csv_blobstring.split('\n')))
# if reading from file do like this
# listed_dictionary = list(csv.DictReader(open(f, 'r')))
# then go through the listed_dictionary and initialize
# Mold() objects and add them to a list
for line in listed_dictionary:
moldlist.append(Mold(line))
# remove duplicate entries
moldlist = removedupes_requestsenttime(moldlist)
return(moldlist)
def hits_in_past_x_seconds(lister,x=None):
''' Takes a list of Mold() objects and returns
a list of those objects with a requestsenttime of
less than x seconds ago.
'''
if x < 0 or x is None:
x = 3600 # default to one hour
lit = []
now = datetime.now()
for obj in biglist:
diff = now - obj.requestsenttime
if diff.total_seconds() < x:
lit.append(obj)
return(lit)
def pull_report_salesforce(desiredReport):
''' Logs into Salesforce via API then pulls the 'desiredReport'
as CSV then returns the entire Requests.Response object in which
the requests.Response.content contains a big string blob of CSV.
Many thanks to the 'Alex' dude in this thread:
http://salesforce.stackexchange.com/questions/47414/download-a-report-using-python
for holding my hand through this part.
'''
xml_complete = xml % (username,password,token)
login_headers = {'Content-Type': 'text/xml; charset=utf-8','SOAPAction': 'login'}
login = requests.post(login_url,headers=login_headers,data=xml_complete)
# the response will have the sessionID in the body as XML
# so we loop through the XML until we find it
tree = ElementTree.fromstring(login.text)
sid = ''
cookie = ''
for thing in tree:
for sub in thing:
for a in sub:
for b in a:
if b.tag == '{urn:partner.soap.sforce.com}sessionId':
sid = b.text # set this tag's text as the sessionID
# now that we have sid and token we can pull what we want, right?
headers = {'Bearer':token}
cookie = {'sid':sid}
report_url = '%s%s?export=1&enc=UTF-8&xf=csv' % (instanceurl,desiredReport)
query = requests.get(report_url,headers=headers,cookies=cookie)
return(query)
results = pull_report_salesforce(report_id) # get the results from Salesforce call
biglist = proc_csv(results.content) # process the CSV and put it into big list of Mold() objects
print_table(moldlist,50,'20')
print_table(hits_in_past_x_seconds(biglist,3600,),title="Hits in Past 1 hour")
print_table(hits_in_past_x_seconds(biglist,43200,),title="Hits in Past 12 hours")
print_table(hits_in_past_x_seconds(biglist,604800,),title="Hits in Past 7 days")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment