Skip to content

Instantly share code, notes, and snippets.

@sungchun12
Created January 8, 2024 22:55
Show Gist options
  • Save sungchun12/615379b63fad70046051d091a1200980 to your computer and use it in GitHub Desktop.
Save sungchun12/615379b63fad70046051d091a1200980 to your computer and use it in GitHub Desktop.
Demo script to run a Datafold Cloud xdb data diff between Databricks and Snowflake with simple API calls
"""
Demo script to run a xdb data diff between Databricks and Snowflake with simple API calls
"""
import os
import time
from pydantic import BaseModel
from typing import Any, List, Optional
import requests
from tabulate import tabulate
from termcolor import colored
from halo import Halo
# TODO: replace with your own Datafold API key
host = "app.datafold.com" # replace with 'staging.datafold.io', or on-premise host URL
datafold_api_key = os.environ.get(
"DATAFOLD_API_KEY"
) # replace with your own Datafold API key
# TODO: replace with your own data diff configs
data_source1_id = 6716 # replace with your own data source id
data_source2_id = 4932 # replace with your own data source id
table1 = [
"hive_metastore",
"dbt_sung",
"dim_orgs",
] # ["database", "schema", "table/view"]
table2 = [
"DEMO",
"CORE",
"DIM_ORGS",
] # ["database", "schema", "table/view"]
pk_columns = ["org_id"] # replace with your own primary key columns
cols = [
"created_at",
"num_users",
"sub_created_at",
"sub_plan",
"sub_price",
] # replace with your own column names
class DataDiffConfigs(BaseModel):
data_source1_id: int
data_source2_id: int
table1: List[str]
table2: List[str]
pk_columns: List[str]
columns_to_compare: Optional[List[str]]
data_diff_configs = DataDiffConfigs(
data_source1_id=data_source1_id,
data_source2_id=data_source2_id,
table1=table1,
table2=table2,
pk_columns=pk_columns,
columns_to_compare=cols,
)
class DataDiff:
def __init__(
self,
host: str,
datafold_api_key: str,
):
self.session = requests.Session()
self.host = host
self.session.headers["Authorization"] = f"Key {datafold_api_key}"
def create_diff(self, data_diff_configs: DataDiffConfigs) -> int:
resp = self.session.post(
f"https://{self.host}/api/v1/datadiffs", json=data_diff_configs.dict()
)
resp.raise_for_status()
data = resp.json()
url = colored(f"https://{self.host}/datadiffs/{data['id']}", "blue")
print(f"Started XDB Diff: {url}")
return data["id"]
def get_diff_summary(self, id: int) -> dict[str, Any]:
resp = self.session.get(
f"https://{self.host}/api/v1/datadiffs/{id}/summary_results"
)
resp.raise_for_status()
data = resp.json()
return data
def wait_for_results(self, id: int) -> dict[str, Any]:
spinner = Halo(text="Running", spinner="dots", color="green")
start_time = time.time()
try:
spinner.start()
while True:
summary = self.get_diff_summary(id)
if summary["status"] in ("success", "error"):
elapsed_time = round(time.time() - start_time, 2)
spinner.succeed(
f"Completed with status: {summary['status']}. Total run time: {elapsed_time} seconds"
)
return summary
elapsed_seconds = int(time.time() - start_time)
spinner.text = f"Running... {elapsed_seconds} seconds"
time.sleep(1)
finally:
spinner.stop()
def print_results(
self, results: dict[str, Any], data_diff_configs: DataDiffConfigs
):
# For "pks"
headers_pks = ["Stats", data_diff_configs.table1, data_diff_configs.table2]
rows_pks = []
for key, value in results["pks"].items():
rows_pks.append([key] + value)
print("Data Diff Primary Keys Summary:")
print(tabulate(rows_pks, headers=headers_pks, tablefmt="grid"))
def run_xdb_diff(self, data_diff_configs: DataDiffConfigs):
diff_id = self.create_diff(data_diff_configs)
results = self.wait_for_results(diff_id)
self.print_results(results, data_diff_configs)
if __name__ == "__main__":
datadiff = DataDiff(host, datafold_api_key)
datadiff.run_xdb_diff(data_diff_configs)
@sungchun12
Copy link
Author

Run with python3 xdb_diff.py to get this kind of result.
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment