Last active
August 31, 2021 07:18
-
-
Save ronniejoshua/aa579aaf9bc06704aa05f78beb29533c to your computer and use it in GitHub Desktop.
Unbounce API Connector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright 2021 Ronnie Joshua | |
# Email: ron.juden@gmail.com | |
# Linkedin: https://www.linkedin.com/in/ronnie-joshua/ | |
# Vist: http://www.webanalyticsinfo.com/ | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# https://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Generic/Built-in Libs | |
import ssl | |
from pprint import pprint | |
from typing import Dict, List, Callable | |
import os | |
import logging | |
from functools import partial | |
import concurrent.futures | |
import traceback | |
import asyncio | |
import time | |
import functools | |
from itertools import repeat | |
import datetime as dt | |
# Other Libs | |
from dotenv import load_dotenv | |
import requests | |
from requests.exceptions import RequestException | |
import backoff | |
from requests.auth import HTTPBasicAuth | |
import aiohttp | |
from multiprocessing.pool import Pool | |
import pymongo | |
from pymongo import InsertOne | |
from pymongo.errors import BulkWriteError | |
# Owned Libs | |
# Developer & Maintainer | |
__author__ = "Ronnie Joshua" | |
__copyright__ = "Copyright 2021, Unbounce API Connector" | |
__credits__ = ["Ronnie Joshua"] | |
__license__ = "" | |
__version__ = "0.1.0" | |
__maintainer__ = "Ronnie Joshua" | |
__email__ = "ron.juden@gmail.com" | |
__status__ = "Dev" | |
# Clearing Env Variable | |
os.environ.pop("mongo_db_pwd") | |
# Loading Env Variable & Credentials | |
load_dotenv() | |
unbounce_user = os.getenv("unbounce_api_key") | |
unbounce_pwd = os.getenv("unbounce_pwd") | |
unbounce_credentials = HTTPBasicAuth(unbounce_user, unbounce_pwd) | |
mongo_db_user = os.getenv("mongo_db_user") | |
mongo_db_pwd = os.getenv("mongo_db_pwd") | |
database = "unbounce" | |
client = pymongo.MongoClient( | |
f"mongodb+srv://{mongo_db_user}:{mongo_db_pwd}@cluster0.vhlzr.gcp.mongodb.net/{database}?retryWrites=true&w=majority", | |
ssl_cert_reqs=ssl.CERT_NONE, | |
) | |
# Utility Functions | |
# Using a decorator to time the function execution time | |
def timer(func: Callable): | |
"""Print the runtime of the decorated function""" | |
@functools.wraps(func) | |
def wrapper_timer(*args, **kwargs): | |
start_time = time.perf_counter() | |
value = func(*args, **kwargs) | |
end_time = time.perf_counter() | |
run_time = end_time - start_time | |
print(f"Finished {func.__name__!r} in {run_time:.4f} secs") | |
return value | |
return wrapper_timer | |
# Use of Recursion | |
def flatten_json(y): | |
""" | |
Demonstrates the concept of both recursion and closures | |
""" | |
out = {} | |
def flatten(x, name=""): | |
if type(x) is dict: | |
for a in x: | |
flatten(x[a], name + a + "_") | |
elif type(x) is list: | |
i = 0 | |
for a in x: | |
flatten(a, name + str(i) + "_") | |
i += 1 | |
else: | |
out[name[:-1]] = x | |
flatten(y) | |
# print(flatten.__closure__[0].cell_contents) | |
return out | |
# Transforming the data object before loading it to Mongo DB | |
def transform_data_from_unbounce(all_data) -> List[Dict]: | |
leads_to_upload = list() | |
for data_obj in all_data: | |
if isinstance(data_obj, list): | |
for data_dict in data_obj: | |
leads_list = data_dict["leads"] | |
if leads_list: | |
for lead_data_dict in leads_list: | |
lead_data_dict = flatten_json(lead_data_dict) | |
lead_data_dict["_id"] = lead_data_dict["id"] | |
del lead_data_dict["id"] | |
lead_data_dict["insert_time_stamp"] = dt.datetime.now().isoformat() | |
leads_to_upload.append(lead_data_dict) | |
elif isinstance(data_obj, dict): | |
leads_list = data_obj["leads"] | |
if leads_list: | |
for lead_data_dict in leads_list: | |
lead_data_dict = flatten_json(lead_data_dict) | |
lead_data_dict["_id"] = lead_data_dict["id"] | |
del lead_data_dict["id"] | |
lead_data_dict["insert_time_stamp"] = dt.datetime.now().isoformat() | |
leads_to_upload.append(lead_data_dict) | |
return leads_to_upload | |
# Bulk Write data to MongoDB, throws error in case of duplicatioin | |
# https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/ | |
# pymongo: https://pymongo.readthedocs.io/en/stable/examples/bulk.html | |
def load_data_mongodb(transform_data): | |
db = client.unbounce | |
unbounce_leads = db.leads | |
if transform_data: | |
requests = list(map(InsertOne, transform_data)) | |
try: | |
unbounce_leads.bulk_write(requests, ordered=False) | |
except BulkWriteError as bwe: | |
pprint(bwe.details['writeErrors']) | |
# Generating Unbounce API Endpoints [A Different Approach] | |
BASE_URL = "https://api.unbounce.com" | |
account_id = None | |
sub_account_id = None | |
sub_account_ids = None | |
domain_id = None | |
domain_ids = None | |
page_id = None | |
page_ids = None | |
api_enpoints = { | |
"get_all_accounts": f"{BASE_URL}/accounts", | |
"get_an_account": f"{BASE_URL}/accounts/{account_id}", | |
"get_all_sub_accounts_for_an_account": f"{BASE_URL}/accounts/{account_id}/sub_accounts", | |
"get_all_pages_for_an_account": f"{BASE_URL}/accounts/{account_id}/pages", | |
"get_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}", | |
"get_all_domains_for_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}/domains", | |
"get_all_page_groups_for_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}/page_groups", | |
"get_all_pages_for_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}/pages", | |
"get_details_for_a_domain": f"{BASE_URL}/domains/{domain_id}", | |
"get_all_pages_for_a_domain": f"{BASE_URL}/domains/{domain_id}/pages", | |
"get_all_pages": f"{BASE_URL}/pages", | |
"get_a_page": f"{BASE_URL}/pages/{page_id}", | |
"get_form_fields_for_a_page": f"{BASE_URL}/pages/{page_id}/form_fields", | |
"get_all_leads_for_a_page": f"{BASE_URL}/pages/{page_id}/leads", | |
} | |
@backoff.on_exception(backoff.expo, RequestException, max_tries=5) | |
def get_resources(api_endpoint: str, uri_parameter: str = None, **kwargs): | |
"""[summary] | |
Args: | |
api_endpoint (str): [Should be a valid api endpoin - Refer to documentation] | |
uri_parameter (str, optional): any one of these values | |
[account_id, sub_account_id, domain_id, page_id]. Defaults to None. | |
payload parameters: | |
sort_order : | |
count : | |
from_date : | |
to_date : | |
offseet : | |
limit : | |
with_status : | |
include_sub_pages : | |
Returns: | |
[type]: [description] | |
""" | |
resource_endpoint = api_enpoints.get(api_endpoint) | |
assert resource_endpoint is not None, "Resource Endpoint Does Not Exists" | |
if uri_parameter is not None: | |
resource_endpoint = resource_endpoint.replace("None", uri_parameter) | |
assert ( | |
resource_endpoint.find("/None") == -1 | |
), f"Check the Resource Endpoint URI - Contains Undefined URI: {resource_endpoint}" | |
payload = { | |
"sort_order": kwargs.get("sort_order"), | |
"count": kwargs.get("count"), | |
"from": kwargs.get("from_date"), | |
"to": kwargs.get("to_date"), | |
"offset": kwargs.get("offset"), | |
"limit": kwargs.get("limit"), | |
"with_stats": kwargs.get("with_stats"), | |
"include_sub_pages": kwargs.get("include_sub_pages"), | |
} | |
response_objects = list() | |
# Session object allows one to persist certain parameters across requests. | |
# So if several requests are being made to the same host, the underlying TCP connection will be reused, | |
# which can result in a significant performance increase. A session object all the methods as of requests. | |
session = requests.Session() | |
with session.get(resource_endpoint, auth=unbounce_credentials, params=payload) as response: | |
# Need to implement reposne status check | |
response_objects.append(response.json()) | |
while True: | |
try: | |
resource_endpoint = response.json()["metadata"]["next"]["href"] | |
with session.get(resource_endpoint, auth=unbounce_credentials) as response: | |
response_objects.append(response.json()) | |
# I think this is redundant, I will refactor this later | |
resource_endpoint = response.json()["metadata"]["next"]["href"] | |
except KeyError: | |
break | |
return response_objects | |
@timer | |
def get_all_page_ids(page_id_payload): | |
try: | |
return [[page_dd["id"] for page_dd in response["pages"]] | |
for response in get_resources("get_all_pages", **page_id_payload)][0] | |
except Exception as e: | |
print(traceback.print_exc()) | |
@timer | |
def run_synchronous_process(api_endpoint, uri_parameters, payload): | |
data_object = list() | |
for uri_parameter in uri_parameters: | |
try: | |
data_object.append(get_resources(api_endpoint, uri_parameter, **payload)) | |
except Exception as e: | |
# TODO : Something with the exception | |
continue | |
return data_object | |
@timer | |
def synchronous_upload_to_mongo_db(page_id_payload, leads_payload): | |
print("Uploading Data to MongodDB") | |
list_page_ids = get_all_page_ids(page_id_payload) | |
extract_data = run_synchronous_process("get_all_leads_for_a_page", list_page_ids, leads_payload) | |
transform_data = transform_data_from_unbounce(extract_data) | |
load_data_mongodb(transform_data) | |
# Example of partial() implmentation, can be use for multi-threading | |
def create_partial_functions(function, api_endpoint, payload): | |
"""Create a Partial Function to be use for Multi-threading""" | |
return partial(function, api_endpoint=api_endpoint, **payload) | |
# Illustration Purposes, have taken a simple implementation of the function | |
def worker_wrapper(*arg: tuple): | |
api_endpoint, uri_parameters, payload = arg | |
return get_resources(api_endpoint, uri_parameters, **payload) | |
def modified_worker_wrapper(*arg: tuple): | |
# Extracts the data | |
api_endpoint, uri_parameters, payload = arg | |
extract_data = get_resources(api_endpoint, uri_parameters, **payload) | |
transform_data = transform_data_from_unbounce(extract_data) | |
load_data_mongodb(transform_data) | |
# A thread is a unit of execution within a process. Multithreading refers to concurrently executing | |
# multiple threads by rapidly switching the control of the CPU between threads (called context switching). | |
@timer | |
def run_multi_threading(api_endpoint, uri_parameters, payload): | |
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: | |
executor.map( | |
lambda arg_tup: worker_wrapper(*arg_tup), | |
zip(repeat(api_endpoint), uri_parameters, repeat(payload)), | |
) | |
executor.shutdown(wait=True) | |
# Multiprocessing refers to the ability of a system to support more than one processor at the same time. | |
# Applications in a multiprocessing system are broken to smaller routines that run independently. | |
# The operating system allocates these threads to the processors improving performance of the system. | |
@timer | |
def run_multi_processing(api_endpoint, uri_parameters, payload): | |
with Pool(processes=os.cpu_count()) as pool: | |
# Like map() except that the elements of the iterable are expected to be iterables | |
# that are unpacked as arguments. | |
# Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)]. | |
pool.starmap(worker_wrapper, tuple(zip(repeat(api_endpoint), uri_parameters, repeat(payload)))) | |
@timer | |
def run_multi_processing_upload(api_endpoint, uri_parameters, payload): | |
with Pool(processes=os.cpu_count()) as pool: | |
pool.starmap(modified_worker_wrapper, tuple(zip(repeat(api_endpoint), uri_parameters, repeat(payload)))) | |
# https://realpython.com/async-io-python/#a-full-program-asynchronous-requests | |
# https://www.datacamp.com/community/tutorials/asyncio-introduction | |
# https://realpython.com/async-io-python/ | |
# https://www.youtube.com/watch?v=R4Oz8JUuM4s | |
async def async_get_resources( | |
session, api_endpoint: str, uri_parameter: str = None, **kwargs | |
): | |
unbounce_credentials = aiohttp.BasicAuth(unbounce_user, unbounce_pwd) | |
resource_endpoint = api_enpoints.get(api_endpoint) | |
assert resource_endpoint is not None, "Resource Endpoint Does Not Exists" | |
if uri_parameter is not None: | |
resource_endpoint = resource_endpoint.replace("None", uri_parameter) | |
assert ( | |
resource_endpoint.find("/None") == -1 | |
), f"Check the Resource Endpoint URI - Contains Undefined URI: {resource_endpoint}" | |
payload = { | |
"sort_order": str(kwargs.get("sort_order")), | |
"count": str(kwargs.get("count")), | |
"from": str(kwargs.get("from_date")), | |
"to": str(kwargs.get("to_date")), | |
"offset": str(kwargs.get("offset")), | |
"limit": str(kwargs.get("limit")), | |
"with_stats": str(kwargs.get("with_stats")), | |
"include_sub_pages": str(kwargs.get("include_sub_pages")), | |
} | |
response_objects = list() | |
response = await session.get( | |
resource_endpoint, auth=unbounce_credentials, params=payload, ssl=False | |
) | |
response_objects.append(await response.json()) | |
while True: | |
try: | |
resource_endpoint = (await response.json())["metadata"]["next"]["href"] | |
response = await session.get( | |
resource_endpoint, auth=unbounce_credentials, ssl=False | |
) | |
response_objects.append(await response.json()) | |
except KeyError: | |
break | |
return response_objects | |
async def get_data(api_endpoint, uri_parameters, **kwargs): | |
async with aiohttp.ClientSession() as session: | |
data = [] | |
for uri_parameter in uri_parameters: | |
response = await async_get_resources( | |
session, api_endpoint, uri_parameter, **kwargs | |
) | |
data.append(response) | |
return data | |
@timer | |
def main_async(): | |
asyncio.run(get_data("get_all_leads_for_a_page", list_page_ids, **leads_payload)) | |
if __name__ == "__main__": | |
page_id_payload = { | |
"sort_order": "asc", | |
"count": False, | |
"from_date": "2021-01-01T00:00:00.000Z", | |
"to_date": "2021-12-31T00:00:00.000Z", | |
"offset": 0, | |
"limit": 1000, | |
"with_stats": True, | |
} | |
leads_payload = { | |
"sort_order": "asc", | |
"count": False, | |
"from_date": "2021-01-01T00:00:00.000Z", | |
"to_date": "2021-12-31T00:00:00.000Z", | |
"offset": 0, | |
"limit": 1000, | |
} | |
list_page_ids = get_all_page_ids(page_id_payload) | |
print("STARTED: Single Thread - Get All Leads") | |
extract_data = run_synchronous_process("get_all_leads_for_a_page", list_page_ids, leads_payload) | |
print("STARTED: Multi Thread - Get All Leads") | |
run_multi_threading("get_all_leads_for_a_page", list_page_ids, leads_payload) | |
print("STARTED: Multi Processing - Get All Leads") | |
run_multi_processing("get_all_leads_for_a_page", list_page_ids, leads_payload) | |
print("STARTED: ASYNCIO - Get All Leads") | |
main_async() | |
# Uploading Leads to MongoDB Synchronously | |
synchronous_upload_to_mongo_db(page_id_payload, leads_payload) | |
# Uploading Leads to MongoDB Using MultiProcessing | |
list_page_ids = get_all_page_ids(page_id_payload) | |
run_multi_processing_upload("get_all_leads_for_a_page", list_page_ids, leads_payload) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment