Skip to content

Instantly share code, notes, and snippets.

View pietheinstrengholt's full-sized avatar

Piethein Strengholt pietheinstrengholt

View GitHub Profile
#!/usr/bin/env python
# coding: utf-8
# In[1]:
# Set arguments
dfDataOriginalPath = "/processedzone/"
dfDataChangedPath = "/changedzone/"
cw_database = "AdventureWorks"
{
"info": {
"_postman_id": "956b4730-2042-49dd-9f39-b65c30c5b192",
"name": "Purview Demo",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "Authenticate Purview",
"request": {
# Set arguments
dfDataOriginalPath = "/processedzone/"
dfDataChangedPath = "/changedzone/"
cw_database = "AdventureWorks"
cw_table = "SalesLTAddress"
tenant_id = "xxxxx-xxxx-xxxx-xxxx-xxxxxxx"
client_id = "xxxxx-xxxx-xxxx-xxxx-xxxxxxx"
client_secret = "xxxxxxxxxxxxxxxx"
purview_account = "purview_account"
# Prepare for merge, rename columns of newly loaded data, append 'src_'
from pyspark.sql import functions as F
# Rename all columns in dataChanged, prepend src_, and add additional columns
df_new = dataChanged.select([F.col(c).alias("src_"+c) for c in dataChanged.columns])
src_columnNames = df_new.schema.names
df_new2 = df_new.withColumn('src_current', lit(True)).withColumn('src_effectiveDate', lit(current_date)).withColumn('src_endDate', lit(date(9999, 12, 31)))
df_new2.printSchema()
import hashlib
import json
import os
import jmespath
from sys import exit
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess
# Connect to Atlas via a Service Principal
auth = ServicePrincipalAuthentication(
tenant_id = tenant_id,
import great_expectations as ge
import great_expectations.dataset.sparkdf_dataset
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, BooleanType, DateType
from pyspark.sql import Row
from datetime import datetime
# Convert DataFrame schema data types based on metadata from Purview
if asset_schema:
for i in asset_schema:
%%pyspark
from datetime import date
current_date = datetime.today().date()
try:
# Read original data - this is your scd type 2 table holding all data
dataOriginal = spark.read.load(dfDataOriginalPath + "/" + cw_database + "/" + cw_table, format='delta')
except:
# Use first load when no data exists yet
%%pyspark
from pyspark import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, BooleanType, DateType
from typing import List
from datetime import datetime
{
"entity": {
"status": "ACTIVE",
"createTime": 1592928847,
"updateTime": 1594120997,
"createdBy": "Superadmin",
"version": 1,
"attributes": {
"name": "SalesLTAddress",
"db": {
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "data_landing_zone",
"description": "Data Landing Zone",
"createdBy": "admin",
"updatedBy": "admin",