Skip to content

Instantly share code, notes, and snippets.

@ronniejoshua
Last active August 31, 2021 07:18
Show Gist options
  • Save ronniejoshua/aa579aaf9bc06704aa05f78beb29533c to your computer and use it in GitHub Desktop.
Save ronniejoshua/aa579aaf9bc06704aa05f78beb29533c to your computer and use it in GitHub Desktop.
Unbounce API Connector
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 Ronnie Joshua
# Email: ron.juden@gmail.com
# Linkedin: https://www.linkedin.com/in/ronnie-joshua/
# Vist: http://www.webanalyticsinfo.com/
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generic/Built-in Libs
import ssl
from pprint import pprint
from typing import Dict, List, Callable
import os
import logging
from functools import partial
import concurrent.futures
import traceback
import asyncio
import time
import functools
from itertools import repeat
import datetime as dt
# Other Libs
from dotenv import load_dotenv
import requests
from requests.exceptions import RequestException
import backoff
from requests.auth import HTTPBasicAuth
import aiohttp
from multiprocessing.pool import Pool
import pymongo
from pymongo import InsertOne
from pymongo.errors import BulkWriteError
# Owned Libs
# Developer & Maintainer
__author__ = "Ronnie Joshua"
__copyright__ = "Copyright 2021, Unbounce API Connector"
__credits__ = ["Ronnie Joshua"]
__license__ = ""
__version__ = "0.1.0"
__maintainer__ = "Ronnie Joshua"
__email__ = "ron.juden@gmail.com"
__status__ = "Dev"
# Clearing Env Variable
os.environ.pop("mongo_db_pwd")
# Loading Env Variable & Credentials
load_dotenv()
unbounce_user = os.getenv("unbounce_api_key")
unbounce_pwd = os.getenv("unbounce_pwd")
unbounce_credentials = HTTPBasicAuth(unbounce_user, unbounce_pwd)
mongo_db_user = os.getenv("mongo_db_user")
mongo_db_pwd = os.getenv("mongo_db_pwd")
database = "unbounce"
client = pymongo.MongoClient(
f"mongodb+srv://{mongo_db_user}:{mongo_db_pwd}@cluster0.vhlzr.gcp.mongodb.net/{database}?retryWrites=true&w=majority",
ssl_cert_reqs=ssl.CERT_NONE,
)
# Utility Functions
# Using a decorator to time the function execution time
def timer(func: Callable):
"""Print the runtime of the decorated function"""
@functools.wraps(func)
def wrapper_timer(*args, **kwargs):
start_time = time.perf_counter()
value = func(*args, **kwargs)
end_time = time.perf_counter()
run_time = end_time - start_time
print(f"Finished {func.__name__!r} in {run_time:.4f} secs")
return value
return wrapper_timer
# Use of Recursion
def flatten_json(y):
"""
Demonstrates the concept of both recursion and closures
"""
out = {}
def flatten(x, name=""):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + "_")
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + "_")
i += 1
else:
out[name[:-1]] = x
flatten(y)
# print(flatten.__closure__[0].cell_contents)
return out
# Transforming the data object before loading it to Mongo DB
def transform_data_from_unbounce(all_data) -> List[Dict]:
leads_to_upload = list()
for data_obj in all_data:
if isinstance(data_obj, list):
for data_dict in data_obj:
leads_list = data_dict["leads"]
if leads_list:
for lead_data_dict in leads_list:
lead_data_dict = flatten_json(lead_data_dict)
lead_data_dict["_id"] = lead_data_dict["id"]
del lead_data_dict["id"]
lead_data_dict["insert_time_stamp"] = dt.datetime.now().isoformat()
leads_to_upload.append(lead_data_dict)
elif isinstance(data_obj, dict):
leads_list = data_obj["leads"]
if leads_list:
for lead_data_dict in leads_list:
lead_data_dict = flatten_json(lead_data_dict)
lead_data_dict["_id"] = lead_data_dict["id"]
del lead_data_dict["id"]
lead_data_dict["insert_time_stamp"] = dt.datetime.now().isoformat()
leads_to_upload.append(lead_data_dict)
return leads_to_upload
# Bulk Write data to MongoDB, throws error in case of duplicatioin
# https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/
# pymongo: https://pymongo.readthedocs.io/en/stable/examples/bulk.html
def load_data_mongodb(transform_data):
db = client.unbounce
unbounce_leads = db.leads
if transform_data:
requests = list(map(InsertOne, transform_data))
try:
unbounce_leads.bulk_write(requests, ordered=False)
except BulkWriteError as bwe:
pprint(bwe.details['writeErrors'])
# Generating Unbounce API Endpoints [A Different Approach]
BASE_URL = "https://api.unbounce.com"
account_id = None
sub_account_id = None
sub_account_ids = None
domain_id = None
domain_ids = None
page_id = None
page_ids = None
api_enpoints = {
"get_all_accounts": f"{BASE_URL}/accounts",
"get_an_account": f"{BASE_URL}/accounts/{account_id}",
"get_all_sub_accounts_for_an_account": f"{BASE_URL}/accounts/{account_id}/sub_accounts",
"get_all_pages_for_an_account": f"{BASE_URL}/accounts/{account_id}/pages",
"get_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}",
"get_all_domains_for_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}/domains",
"get_all_page_groups_for_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}/page_groups",
"get_all_pages_for_an_sub_account": f"{BASE_URL}/sub_accounts/{sub_account_id}/pages",
"get_details_for_a_domain": f"{BASE_URL}/domains/{domain_id}",
"get_all_pages_for_a_domain": f"{BASE_URL}/domains/{domain_id}/pages",
"get_all_pages": f"{BASE_URL}/pages",
"get_a_page": f"{BASE_URL}/pages/{page_id}",
"get_form_fields_for_a_page": f"{BASE_URL}/pages/{page_id}/form_fields",
"get_all_leads_for_a_page": f"{BASE_URL}/pages/{page_id}/leads",
}
@backoff.on_exception(backoff.expo, RequestException, max_tries=5)
def get_resources(api_endpoint: str, uri_parameter: str = None, **kwargs):
"""[summary]
Args:
api_endpoint (str): [Should be a valid api endpoin - Refer to documentation]
uri_parameter (str, optional): any one of these values
[account_id, sub_account_id, domain_id, page_id]. Defaults to None.
payload parameters:
sort_order :
count :
from_date :
to_date :
offseet :
limit :
with_status :
include_sub_pages :
Returns:
[type]: [description]
"""
resource_endpoint = api_enpoints.get(api_endpoint)
assert resource_endpoint is not None, "Resource Endpoint Does Not Exists"
if uri_parameter is not None:
resource_endpoint = resource_endpoint.replace("None", uri_parameter)
assert (
resource_endpoint.find("/None") == -1
), f"Check the Resource Endpoint URI - Contains Undefined URI: {resource_endpoint}"
payload = {
"sort_order": kwargs.get("sort_order"),
"count": kwargs.get("count"),
"from": kwargs.get("from_date"),
"to": kwargs.get("to_date"),
"offset": kwargs.get("offset"),
"limit": kwargs.get("limit"),
"with_stats": kwargs.get("with_stats"),
"include_sub_pages": kwargs.get("include_sub_pages"),
}
response_objects = list()
# Session object allows one to persist certain parameters across requests.
# So if several requests are being made to the same host, the underlying TCP connection will be reused,
# which can result in a significant performance increase. A session object all the methods as of requests.
session = requests.Session()
with session.get(resource_endpoint, auth=unbounce_credentials, params=payload) as response:
# Need to implement reposne status check
response_objects.append(response.json())
while True:
try:
resource_endpoint = response.json()["metadata"]["next"]["href"]
with session.get(resource_endpoint, auth=unbounce_credentials) as response:
response_objects.append(response.json())
# I think this is redundant, I will refactor this later
resource_endpoint = response.json()["metadata"]["next"]["href"]
except KeyError:
break
return response_objects
@timer
def get_all_page_ids(page_id_payload):
try:
return [[page_dd["id"] for page_dd in response["pages"]]
for response in get_resources("get_all_pages", **page_id_payload)][0]
except Exception as e:
print(traceback.print_exc())
@timer
def run_synchronous_process(api_endpoint, uri_parameters, payload):
data_object = list()
for uri_parameter in uri_parameters:
try:
data_object.append(get_resources(api_endpoint, uri_parameter, **payload))
except Exception as e:
# TODO : Something with the exception
continue
return data_object
@timer
def synchronous_upload_to_mongo_db(page_id_payload, leads_payload):
print("Uploading Data to MongodDB")
list_page_ids = get_all_page_ids(page_id_payload)
extract_data = run_synchronous_process("get_all_leads_for_a_page", list_page_ids, leads_payload)
transform_data = transform_data_from_unbounce(extract_data)
load_data_mongodb(transform_data)
# Example of partial() implmentation, can be use for multi-threading
def create_partial_functions(function, api_endpoint, payload):
"""Create a Partial Function to be use for Multi-threading"""
return partial(function, api_endpoint=api_endpoint, **payload)
# Illustration Purposes, have taken a simple implementation of the function
def worker_wrapper(*arg: tuple):
api_endpoint, uri_parameters, payload = arg
return get_resources(api_endpoint, uri_parameters, **payload)
def modified_worker_wrapper(*arg: tuple):
# Extracts the data
api_endpoint, uri_parameters, payload = arg
extract_data = get_resources(api_endpoint, uri_parameters, **payload)
transform_data = transform_data_from_unbounce(extract_data)
load_data_mongodb(transform_data)
# A thread is a unit of execution within a process. Multithreading refers to concurrently executing
# multiple threads by rapidly switching the control of the CPU between threads (called context switching).
@timer
def run_multi_threading(api_endpoint, uri_parameters, payload):
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
executor.map(
lambda arg_tup: worker_wrapper(*arg_tup),
zip(repeat(api_endpoint), uri_parameters, repeat(payload)),
)
executor.shutdown(wait=True)
# Multiprocessing refers to the ability of a system to support more than one processor at the same time.
# Applications in a multiprocessing system are broken to smaller routines that run independently.
# The operating system allocates these threads to the processors improving performance of the system.
@timer
def run_multi_processing(api_endpoint, uri_parameters, payload):
with Pool(processes=os.cpu_count()) as pool:
# Like map() except that the elements of the iterable are expected to be iterables
# that are unpacked as arguments.
# Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
pool.starmap(worker_wrapper, tuple(zip(repeat(api_endpoint), uri_parameters, repeat(payload))))
@timer
def run_multi_processing_upload(api_endpoint, uri_parameters, payload):
with Pool(processes=os.cpu_count()) as pool:
pool.starmap(modified_worker_wrapper, tuple(zip(repeat(api_endpoint), uri_parameters, repeat(payload))))
# https://realpython.com/async-io-python/#a-full-program-asynchronous-requests
# https://www.datacamp.com/community/tutorials/asyncio-introduction
# https://realpython.com/async-io-python/
# https://www.youtube.com/watch?v=R4Oz8JUuM4s
async def async_get_resources(
session, api_endpoint: str, uri_parameter: str = None, **kwargs
):
unbounce_credentials = aiohttp.BasicAuth(unbounce_user, unbounce_pwd)
resource_endpoint = api_enpoints.get(api_endpoint)
assert resource_endpoint is not None, "Resource Endpoint Does Not Exists"
if uri_parameter is not None:
resource_endpoint = resource_endpoint.replace("None", uri_parameter)
assert (
resource_endpoint.find("/None") == -1
), f"Check the Resource Endpoint URI - Contains Undefined URI: {resource_endpoint}"
payload = {
"sort_order": str(kwargs.get("sort_order")),
"count": str(kwargs.get("count")),
"from": str(kwargs.get("from_date")),
"to": str(kwargs.get("to_date")),
"offset": str(kwargs.get("offset")),
"limit": str(kwargs.get("limit")),
"with_stats": str(kwargs.get("with_stats")),
"include_sub_pages": str(kwargs.get("include_sub_pages")),
}
response_objects = list()
response = await session.get(
resource_endpoint, auth=unbounce_credentials, params=payload, ssl=False
)
response_objects.append(await response.json())
while True:
try:
resource_endpoint = (await response.json())["metadata"]["next"]["href"]
response = await session.get(
resource_endpoint, auth=unbounce_credentials, ssl=False
)
response_objects.append(await response.json())
except KeyError:
break
return response_objects
async def get_data(api_endpoint, uri_parameters, **kwargs):
async with aiohttp.ClientSession() as session:
data = []
for uri_parameter in uri_parameters:
response = await async_get_resources(
session, api_endpoint, uri_parameter, **kwargs
)
data.append(response)
return data
@timer
def main_async():
asyncio.run(get_data("get_all_leads_for_a_page", list_page_ids, **leads_payload))
if __name__ == "__main__":
page_id_payload = {
"sort_order": "asc",
"count": False,
"from_date": "2021-01-01T00:00:00.000Z",
"to_date": "2021-12-31T00:00:00.000Z",
"offset": 0,
"limit": 1000,
"with_stats": True,
}
leads_payload = {
"sort_order": "asc",
"count": False,
"from_date": "2021-01-01T00:00:00.000Z",
"to_date": "2021-12-31T00:00:00.000Z",
"offset": 0,
"limit": 1000,
}
list_page_ids = get_all_page_ids(page_id_payload)
print("STARTED: Single Thread - Get All Leads")
extract_data = run_synchronous_process("get_all_leads_for_a_page", list_page_ids, leads_payload)
print("STARTED: Multi Thread - Get All Leads")
run_multi_threading("get_all_leads_for_a_page", list_page_ids, leads_payload)
print("STARTED: Multi Processing - Get All Leads")
run_multi_processing("get_all_leads_for_a_page", list_page_ids, leads_payload)
print("STARTED: ASYNCIO - Get All Leads")
main_async()
# Uploading Leads to MongoDB Synchronously
synchronous_upload_to_mongo_db(page_id_payload, leads_payload)
# Uploading Leads to MongoDB Using MultiProcessing
list_page_ids = get_all_page_ids(page_id_payload)
run_multi_processing_upload("get_all_leads_for_a_page", list_page_ids, leads_payload)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment