Skip to content

Instantly share code, notes, and snippets.

@erikrichardlarson
Created July 16, 2018 16:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erikrichardlarson/31c535d1a0a33c2bed71009dd3b68ba0 to your computer and use it in GitHub Desktop.
Save erikrichardlarson/31c535d1a0a33c2bed71009dd3b68ba0 to your computer and use it in GitHub Desktop.
from io import StringIO
import pandas as pd
import numpy as np
import requests
import datetime
import json
import time
class SmartListExport:
"""
A SmartListExport allows the user to export a marketo smart list to a csv and/or
pandas dataframe for further processing
Parameters
----------
client_id : Marketo client id listed in admin, required
client_secret : Marketo client secret listed in admin, required
base_url : Endpoint listed in Web Services within Marketo Admin, required
Note that the trailing forward slash is required
e.g https://123-TEST-123.mktorest.com/
smart_list_id : str, required
Pull the int within the url of a given smart list, the number is after
'SL' and before the first non-numeric character in the url
e.g https://marketo.com/#SL12345A1 ---> 12345
fields : list, default to empty
list of Marketo fields to extract, API field names required
e.g ['email', 'AccountId']
Returns
-------
A specification for a SmartListExport
Examples
--------
This will export all emails within the smart list 1234,
assuming the client id, client secret, and base url are set
SmartListExport(CLIENT_ID, CLIENT_SECRET, BASE_URL,
'1234', fields=['email'])
"""
def __init__(self, client_id, client_secret, base_url,
smart_list_id, fields=[]):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.smart_list_id = smart_list_id
self.fields = fields
self._access_token = None
self._export_id = None
def _create_access_token(self):
auth = requests.get('{}identity/oauth/token?grant_type='
'client_credentials&client_id={}&client_secret={}'
.format(self.base_url,
self.client_id,
self.client_secret))
access_token = json.loads(auth.text)['access_token']
return access_token
def _create_bulk_job(self):
headers = {'Content-type': 'application/json'}
sl_body = json.dumps({
'fields': self.fields,
'format': 'CSV',
'filter': {'smartListId': self.smart_list_id}
})
creator = requests.post('{}bulk/v1/leads/export/'
'create.json?access_token={}'
.format(self.base_url,
self.access_token),
data=sl_body,
headers=headers)
self.export_id = json.loads(creator.text)['result'][0]['exportId']
def _enqueue_job(self):
requests.post('{}bulk/v1/leads/export/{}'
'/enqueue.json?access_token={}'
.format(self.base_url,
self.export_id,
self.access_token))
def _poller(self):
# Poll the job status every 2 minutes, when ready, run the export
processed = False
attempts = 0
while processed is False and attempts < 10:
self.access_token = self._create_access_token()
job_status = requests.get('{}bulk/v1/leads/export/{}'
'/status.json?access_token={}'
.format(self.base_url,
self.export_id,
self.access_token))
status = json.loads(job_status.text)['result'][0]['status']
print 'Polling job..', job_status.text
if status == 'Failed':
self._close_job()
if status != 'Completed' and status != 'Failed':
attempts += 1
time.sleep(120.0)
else:
processed = True
return status
def _export(self, status):
if status == 'Completed':
self.access_token = self._create_access_token()
export = requests.get('{}bulk/v1/leads/export/'
'{}/file.json?access_token={}'
.format(self.base_url,
self.export_id,
self.access_token))
string_buffer = StringIO(export.text)
df = pd.read_csv(string_buffer, delimiter=r'/n')
print 'Pulled {} records'.format(len(df))
df.replace('null', np.nan, inplace=True)
marketo_field_blob = ','.join(self.fields)
for i, field in enumerate(self.fields):
df[field] = df[marketo_field_blob].str.split(',').str.get(i)
df['export_id'] = self.export_id
df['created_date'] = datetime.datetime.now()
return df
def _close_job(self):
# Close out job given an export id
self.access_token = self._create_access_token()
r = requests.post('{}bulk/v1/leads/export/{}/cancel.json'
'?access_token={}'
.format(self.base_url,
self.export_id,
self.access_token))
print r.json()
print '{} cancelled'.format(self.export_id)
def run(self, push_to_csv=False):
# Just run all of the above functions
self.access_token = self._create_access_token()
self._create_bulk_job()
self._enqueue_job()
status = self._poller()
export_df = self._export(status)
if push_to_csv:
export_df.to_csv(self.export_id + '.csv', index=False)
return export_df
CLIENT_ID = # Your client id
CLIENT_SECRET = # Your client secret
BASE_URL = # Your base url
smart_list_id = # Enter your target smart list id
dummy_export = SmartListExport(CLIENT_ID, CLIENT_SECRET, BASE_URL,
smart_list_id, fields=['email'])
dummy_export_df = dummy_export.run(push_to_csv=True)
print dummy_export_df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment