Created
July 16, 2018 16:47
-
-
Save erikrichardlarson/31c535d1a0a33c2bed71009dd3b68ba0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import StringIO | |
import pandas as pd | |
import numpy as np | |
import requests | |
import datetime | |
import json | |
import time | |
class SmartListExport: | |
""" | |
A SmartListExport allows the user to export a marketo smart list to a csv and/or | |
pandas dataframe for further processing | |
Parameters | |
---------- | |
client_id : Marketo client id listed in admin, required | |
client_secret : Marketo client secret listed in admin, required | |
base_url : Endpoint listed in Web Services within Marketo Admin, required | |
Note that the trailing forward slash is required | |
e.g https://123-TEST-123.mktorest.com/ | |
smart_list_id : str, required | |
Pull the int within the url of a given smart list, the number is after | |
'SL' and before the first non-numeric character in the url | |
e.g https://marketo.com/#SL12345A1 ---> 12345 | |
fields : list, default to empty | |
list of Marketo fields to extract, API field names required | |
e.g ['email', 'AccountId'] | |
Returns | |
------- | |
A specification for a SmartListExport | |
Examples | |
-------- | |
This will export all emails within the smart list 1234, | |
assuming the client id, client secret, and base url are set | |
SmartListExport(CLIENT_ID, CLIENT_SECRET, BASE_URL, | |
'1234', fields=['email']) | |
""" | |
def __init__(self, client_id, client_secret, base_url, | |
smart_list_id, fields=[]): | |
self.client_id = client_id | |
self.client_secret = client_secret | |
self.base_url = base_url | |
self.smart_list_id = smart_list_id | |
self.fields = fields | |
self._access_token = None | |
self._export_id = None | |
def _create_access_token(self): | |
auth = requests.get('{}identity/oauth/token?grant_type=' | |
'client_credentials&client_id={}&client_secret={}' | |
.format(self.base_url, | |
self.client_id, | |
self.client_secret)) | |
access_token = json.loads(auth.text)['access_token'] | |
return access_token | |
def _create_bulk_job(self): | |
headers = {'Content-type': 'application/json'} | |
sl_body = json.dumps({ | |
'fields': self.fields, | |
'format': 'CSV', | |
'filter': {'smartListId': self.smart_list_id} | |
}) | |
creator = requests.post('{}bulk/v1/leads/export/' | |
'create.json?access_token={}' | |
.format(self.base_url, | |
self.access_token), | |
data=sl_body, | |
headers=headers) | |
self.export_id = json.loads(creator.text)['result'][0]['exportId'] | |
def _enqueue_job(self): | |
requests.post('{}bulk/v1/leads/export/{}' | |
'/enqueue.json?access_token={}' | |
.format(self.base_url, | |
self.export_id, | |
self.access_token)) | |
def _poller(self): | |
# Poll the job status every 2 minutes, when ready, run the export | |
processed = False | |
attempts = 0 | |
while processed is False and attempts < 10: | |
self.access_token = self._create_access_token() | |
job_status = requests.get('{}bulk/v1/leads/export/{}' | |
'/status.json?access_token={}' | |
.format(self.base_url, | |
self.export_id, | |
self.access_token)) | |
status = json.loads(job_status.text)['result'][0]['status'] | |
print 'Polling job..', job_status.text | |
if status == 'Failed': | |
self._close_job() | |
if status != 'Completed' and status != 'Failed': | |
attempts += 1 | |
time.sleep(120.0) | |
else: | |
processed = True | |
return status | |
def _export(self, status): | |
if status == 'Completed': | |
self.access_token = self._create_access_token() | |
export = requests.get('{}bulk/v1/leads/export/' | |
'{}/file.json?access_token={}' | |
.format(self.base_url, | |
self.export_id, | |
self.access_token)) | |
string_buffer = StringIO(export.text) | |
df = pd.read_csv(string_buffer, delimiter=r'/n') | |
print 'Pulled {} records'.format(len(df)) | |
df.replace('null', np.nan, inplace=True) | |
marketo_field_blob = ','.join(self.fields) | |
for i, field in enumerate(self.fields): | |
df[field] = df[marketo_field_blob].str.split(',').str.get(i) | |
df['export_id'] = self.export_id | |
df['created_date'] = datetime.datetime.now() | |
return df | |
def _close_job(self): | |
# Close out job given an export id | |
self.access_token = self._create_access_token() | |
r = requests.post('{}bulk/v1/leads/export/{}/cancel.json' | |
'?access_token={}' | |
.format(self.base_url, | |
self.export_id, | |
self.access_token)) | |
print r.json() | |
print '{} cancelled'.format(self.export_id) | |
def run(self, push_to_csv=False): | |
# Just run all of the above functions | |
self.access_token = self._create_access_token() | |
self._create_bulk_job() | |
self._enqueue_job() | |
status = self._poller() | |
export_df = self._export(status) | |
if push_to_csv: | |
export_df.to_csv(self.export_id + '.csv', index=False) | |
return export_df | |
CLIENT_ID = # Your client id | |
CLIENT_SECRET = # Your client secret | |
BASE_URL = # Your base url | |
smart_list_id = # Enter your target smart list id | |
dummy_export = SmartListExport(CLIENT_ID, CLIENT_SECRET, BASE_URL, | |
smart_list_id, fields=['email']) | |
dummy_export_df = dummy_export.run(push_to_csv=True) | |
print dummy_export_df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment