This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[1]: | |
# Set arguments | |
dfDataOriginalPath = "/processedzone/" | |
dfDataChangedPath = "/changedzone/" | |
cw_database = "AdventureWorks" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"info": { | |
"_postman_id": "956b4730-2042-49dd-9f39-b65c30c5b192", | |
"name": "Purview Demo", | |
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" | |
}, | |
"item": [ | |
{ | |
"name": "Authenticate Purview", | |
"request": { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Set arguments | |
dfDataOriginalPath = "/processedzone/" | |
dfDataChangedPath = "/changedzone/" | |
cw_database = "AdventureWorks" | |
cw_table = "SalesLTAddress" | |
tenant_id = "xxxxx-xxxx-xxxx-xxxx-xxxxxxx" | |
client_id = "xxxxx-xxxx-xxxx-xxxx-xxxxxxx" | |
client_secret = "xxxxxxxxxxxxxxxx" | |
purview_account = "purview_account" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Prepare for merge, rename columns of newly loaded data, append 'src_' | |
from pyspark.sql import functions as F | |
# Rename all columns in dataChanged, prepend src_, and add additional columns | |
df_new = dataChanged.select([F.col(c).alias("src_"+c) for c in dataChanged.columns]) | |
src_columnNames = df_new.schema.names | |
df_new2 = df_new.withColumn('src_current', lit(True)).withColumn('src_effectiveDate', lit(current_date)).withColumn('src_endDate', lit(date(9999, 12, 31))) | |
df_new2.printSchema() | |
import hashlib |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import jmespath | |
from sys import exit | |
from pyapacheatlas.auth import ServicePrincipalAuthentication | |
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess | |
# Connect to Atlas via a Service Principal | |
auth = ServicePrincipalAuthentication( | |
tenant_id = tenant_id, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import great_expectations as ge | |
import great_expectations.dataset.sparkdf_dataset | |
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset | |
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, BooleanType, DateType | |
from pyspark.sql import Row | |
from datetime import datetime | |
# Convert DataFrame schema data types based on metadata from Purview | |
if asset_schema: | |
for i in asset_schema: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%pyspark | |
from datetime import date | |
current_date = datetime.today().date() | |
try: | |
# Read original data - this is your scd type 2 table holding all data | |
dataOriginal = spark.read.load(dfDataOriginalPath + "/" + cw_database + "/" + cw_table, format='delta') | |
except: | |
# Use first load when no data exists yet |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%pyspark | |
from pyspark import * | |
from pyspark.sql.window import Window | |
from pyspark.sql.functions import * | |
from pyspark.sql import Row | |
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, BooleanType, DateType | |
from typing import List | |
from datetime import datetime |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"entity": { | |
"status": "ACTIVE", | |
"createTime": 1592928847, | |
"updateTime": 1594120997, | |
"createdBy": "Superadmin", | |
"version": 1, | |
"attributes": { | |
"name": "SalesLTAddress", | |
"db": { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"enumDefs": [], | |
"structDefs": [], | |
"classificationDefs": [], | |
"entityDefs": [ | |
{ | |
"name": "data_landing_zone", | |
"description": "Data Landing Zone", | |
"createdBy": "admin", | |
"updatedBy": "admin", |