Skip to content

Instantly share code, notes, and snippets.

@sharoonthomas
Created November 20, 2018 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharoonthomas/dc6ada7b88835384e9ea53af8efcb985 to your computer and use it in GitHub Desktop.
Save sharoonthomas/dc6ada7b88835384e9ea53af8efcb985 to your computer and use it in GitHub Desktop.
Utilize Fulfil API to download complete Activity Stream Data history
import requests
import pandas as pd
from pandas.io.json import json_normalize
import numpy as np
import os
import json
from google.cloud import storage, bigquery
import datetime
import time
from multiprocessing.pool import ThreadPool
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def get_url(url):
"""Get response JSON from FulFil with Headers"""
try:
start_time = time.time()
headers = {'Accept':'application/json',
'Authorization':'Bearer bot-AUTHORIZATIONCODE',
'Host': 'braceability.fulfil.io'}
response = requests.get(url,headers=headers,timeout=15)
response_json = response.json()
print(url,'retrieved in',time.time()-start_time,'seconds')
except Exception as ex:
print(ex)
return {}
return response_json
def multi_thread_fulfil_object_csvs_to_bucket(response_ids_df,
endpoint='stock.shipment.in',
filename_base='fulfil_supplier_shipments_all_',
bucketfolder = 'fulfil/shipments/in/',
column_subset_list = [],
upload_csvs = False):
"""Download full JSON objects 1-by-1 from Fulfil API using 8 threads.
If upload_csvs == True, upload the CSVs to Google Cloud Storage in chunks of 100 at a time."""
start_time = time.time()
response_ids = response_ids_df['id'].tolist()
urls = ['https://braceability.fulfil.io/api/v2/model/'+ endpoint+'/' + str(pid) for pid in response_ids]
n_chunks = int(np.ceil(len(urls)/ (len(urls) / 100) ))
url_chunks = chunks(urls, n_chunks)
for chunk in url_chunks:
response_records = []
pool = ThreadPool(8)
response_generator = pool.imap_unordered(get_url, chunk)
for response_record in response_generator:
response_records.append(response_record)
pool.close()
pool.join()
if len(response_records) >= 90 or len(response_records) == len(response_ids):
response_records_df = json_normalize(response_records,sep='_')
if column_subset_list:
response_records_columns_set = set(list(response_records_df.columns))
missing_columns = list(set(column_subset_list) - response_records_columns_set)
for column in missing_columns:
response_records_df[column] = ''
response_records_df = response_records_df[column_subset_list]
if upload_csvs:
now_string = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
file_name = filename_base + now_string + '.csv'
bucket_name = 'BUCKETNAME'
bucket_folder = bucketfolder
dataset_id = 'fulfil'
response_records_df.to_csv(file_name,index=False)
upload_csv(bucket_name,bucket_folder,file_name)
os.remove(file_name)
response_records = []
else:
print("Not uploading ", len(response_records), endpoint,"response records")
print(response_records_df.columns)
return response_records_df
print(time.time() - start_time, "seconds for entire process to complete")
return True
def pull_fulfil_object_ids(endpoint='stock.shipment.in',
filename='fulfil_supplier_shipment_in_ids.csv',
bucketfolder = 'fulfil/shipments/in/ids/'):
"""Pull 10,000 Object IDs at a time from Fulfil API.
Save the comp[lete list of Object IDs to the Google Cloud Storage bucketfolder as a CSV."""
response_status_code = 200
i = 1
responses = []
response_json = ['starting']
while response_status_code == 200 and len(response_json) > 0:
try:
url = 'https://braceability.fulfil.io/api/v2/model/'+endpoint+'?page={0}&per_page=10000'.format(i)
headers = {'Accept':'application/json',
'Authorization':'Bearer bot-AUTHORIZATIONCODE',
'Host': 'braceability.fulfil.io'}
response = requests.get(url,headers=headers)
response_json = response.json()
response_status_code = response.status_code
responses.extend(response_json)
if i % 4 == 0 and i != 0:
print(i * 10000, endpoint, "records retrieved")
i += 1
except Exception as ex:
print(ex)
time.sleep(15)
try:
url = 'https://braceability.fulfil.io/api/v2/model/'+endpoint+'?page={0}&per_page=10000'.format(i)
headers = {'Accept':'application/json',
'Authorization':'Bearer bot-AUTHORIZATIONCODE',
'Host': 'braceability.fulfil.io'}
response = requests.get(url,headers=headers)
response_json = response.json()
response_status_code = response.status_code
responses.extend(response_json)
if i % 4 == 0 and i != 0:
print(i * 10000, endpoint, "records retrieved")
i += 1
except Exception as ex1:
print(ex1)
response_status_code = 500
continue
response_ids_df = pd.DataFrame(responses)
dataset_id = 'fulfil'
bucket_name = 'BUCKETNAME'
bucket_folder = bucketfolder
file_name = filename
upload_csv_from_df(bucket_name,bucket_folder,file_name,response_ids_df)
print(len(responses),"records uploaded to Google Cloud Bucket Folder", bucketfolder)
return response_ids_df
#### START ACTIVITY STREAM CODE ####
response_ids_df = pull_fulfil_object_ids(endpoint='ir.activity',
filename='fulfil_activity_stream_ids.csv',
bucketfolder = 'fulfil/journal_entries/ids/')
column_list = [
'actor_display_string',
'actor_email',
'actor_id',
'application',
'create_date_iso_string',
'create_uid',
'id',
'object__display_string',
'object__id',
'object__model',
'rec_blurb_title',
'rec_name',
'target_display_string',
'target_id',
'target_model',
'title',
'type',
'web_user',
'write_date',
'write_uid']
activity_stream_df = multi_thread_fulfil_object_csvs_to_bucket(response_ids_df,
endpoint='ir.activity',
filename_base='fulfil_activity_stream_all_',
bucketfolder = 'fulfil/activity_stream/',
column_subset_list = column_list,
upload_csvs = True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment