Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Utilize Fulfil API to download complete Activity Stream Data history
import requests
import pandas as pd
from pandas.io.json import json_normalize
import numpy as np
import os
import json
from google.cloud import storage, bigquery
import datetime
import time
from multiprocessing.pool import ThreadPool
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def get_url(url):
"""Get response JSON from FulFil with Headers"""
try:
start_time = time.time()
headers = {'Accept':'application/json',
'Authorization':'Bearer bot-AUTHORIZATIONCODE',
'Host': 'braceability.fulfil.io'}
response = requests.get(url,headers=headers,timeout=15)
response_json = response.json()
print(url,'retrieved in',time.time()-start_time,'seconds')
except Exception as ex:
print(ex)
return {}
return response_json
def multi_thread_fulfil_object_csvs_to_bucket(response_ids_df,
endpoint='stock.shipment.in',
filename_base='fulfil_supplier_shipments_all_',
bucketfolder = 'fulfil/shipments/in/',
column_subset_list = [],
upload_csvs = False):
"""Download full JSON objects 1-by-1 from Fulfil API using 8 threads.
If upload_csvs == True, upload the CSVs to Google Cloud Storage in chunks of 100 at a time."""
start_time = time.time()
response_ids = response_ids_df['id'].tolist()
urls = ['https://braceability.fulfil.io/api/v2/model/'+ endpoint+'/' + str(pid) for pid in response_ids]
n_chunks = int(np.ceil(len(urls)/ (len(urls) / 100) ))
url_chunks = chunks(urls, n_chunks)
for chunk in url_chunks:
response_records = []
pool = ThreadPool(8)
response_generator = pool.imap_unordered(get_url, chunk)
for response_record in response_generator:
response_records.append(response_record)
pool.close()
pool.join()
if len(response_records) >= 90 or len(response_records) == len(response_ids):
response_records_df = json_normalize(response_records,sep='_')
if column_subset_list:
response_records_columns_set = set(list(response_records_df.columns))
missing_columns = list(set(column_subset_list) - response_records_columns_set)
for column in missing_columns:
response_records_df[column] = ''
response_records_df = response_records_df[column_subset_list]
if upload_csvs:
now_string = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
file_name = filename_base + now_string + '.csv'
bucket_name = 'BUCKETNAME'
bucket_folder = bucketfolder
dataset_id = 'fulfil'
response_records_df.to_csv(file_name,index=False)
upload_csv(bucket_name,bucket_folder,file_name)
os.remove(file_name)
response_records = []
else:
print("Not uploading ", len(response_records), endpoint,"response records")
print(response_records_df.columns)
return response_records_df
print(time.time() - start_time, "seconds for entire process to complete")
return True
def pull_fulfil_object_ids(endpoint='stock.shipment.in',
filename='fulfil_supplier_shipment_in_ids.csv',
bucketfolder = 'fulfil/shipments/in/ids/'):
"""Pull 10,000 Object IDs at a time from Fulfil API.
Save the comp[lete list of Object IDs to the Google Cloud Storage bucketfolder as a CSV."""
response_status_code = 200
i = 1
responses = []
response_json = ['starting']
while response_status_code == 200 and len(response_json) > 0:
try:
url = 'https://braceability.fulfil.io/api/v2/model/'+endpoint+'?page={0}&per_page=10000'.format(i)
headers = {'Accept':'application/json',
'Authorization':'Bearer bot-AUTHORIZATIONCODE',
'Host': 'braceability.fulfil.io'}
response = requests.get(url,headers=headers)
response_json = response.json()
response_status_code = response.status_code
responses.extend(response_json)
if i % 4 == 0 and i != 0:
print(i * 10000, endpoint, "records retrieved")
i += 1
except Exception as ex:
print(ex)
time.sleep(15)
try:
url = 'https://braceability.fulfil.io/api/v2/model/'+endpoint+'?page={0}&per_page=10000'.format(i)
headers = {'Accept':'application/json',
'Authorization':'Bearer bot-AUTHORIZATIONCODE',
'Host': 'braceability.fulfil.io'}
response = requests.get(url,headers=headers)
response_json = response.json()
response_status_code = response.status_code
responses.extend(response_json)
if i % 4 == 0 and i != 0:
print(i * 10000, endpoint, "records retrieved")
i += 1
except Exception as ex1:
print(ex1)
response_status_code = 500
continue
response_ids_df = pd.DataFrame(responses)
dataset_id = 'fulfil'
bucket_name = 'BUCKETNAME'
bucket_folder = bucketfolder
file_name = filename
upload_csv_from_df(bucket_name,bucket_folder,file_name,response_ids_df)
print(len(responses),"records uploaded to Google Cloud Bucket Folder", bucketfolder)
return response_ids_df
#### START ACTIVITY STREAM CODE ####
response_ids_df = pull_fulfil_object_ids(endpoint='ir.activity',
filename='fulfil_activity_stream_ids.csv',
bucketfolder = 'fulfil/journal_entries/ids/')
column_list = [
'actor_display_string',
'actor_email',
'actor_id',
'application',
'create_date_iso_string',
'create_uid',
'id',
'object__display_string',
'object__id',
'object__model',
'rec_blurb_title',
'rec_name',
'target_display_string',
'target_id',
'target_model',
'title',
'type',
'web_user',
'write_date',
'write_uid']
activity_stream_df = multi_thread_fulfil_object_csvs_to_bucket(response_ids_df,
endpoint='ir.activity',
filename_base='fulfil_activity_stream_all_',
bucketfolder = 'fulfil/activity_stream/',
column_subset_list = column_list,
upload_csvs = True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.