Skip to content

Instantly share code, notes, and snippets.

def test_source_to_curated_workflow(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
storage_account_name: str,
container_name: str,
base_path: str,
input_sample_file_name: str,
adls_connection_client,
sql_connection_client):
We couldn’t find that file to show.
assert pipeline_run_result == constants.PIPELINE_SUCCESS_STATUS
# Check for Data in Raw Zone
parquet_dataframe = adls.read_parquet_file_from_ADLS(
adls_connection_client, container_name, target_path)
num_of_rows = len(parquet_dataframe.index)
#Assert
{"Emp_id":"10030","Emp_name":"Linden, Mathew","Emp_location":"Chicago","Emp_dept":"IT","Emp_designation":"System Engineer","Last_promotion_date":"NULL","Hired_date":"6/15/2017"}
{"Emp_id":"10025","Emp_name":"Beatrice, Courtney","Emp_location":"Seattle","Emp_dept":"IT","Emp_designation":"Sr System Engineer","Last_promotion_date":"3/1/2020","Hired_date":"3/10/2018"}
{"Emp_id":"10029","Emp_name":"Heitzman, Anthony","Emp_location":"Seattle","Emp_dept":"HR","Emp_designation":"Recruiter","Last_promotion_date":"NULL","Hired_date":"6/2/2018"}
{"Emp_id":"10028","Emp_name":"Eaton, Marianne","Emp_location":"Dallas","Emp_dept":"Finance","Emp_designation":"Finance Director","Last_promotion_date":"3/1/2021","Hired_date":"1/28/2019"}
{"Emp_id":"10027","Emp_name":"Forrest, Alex","Emp_location":"Chicago","Emp_dept":"Finance","Emp_designation":"Sr Financial Analyst","Last_promotion_date":"3/1/2021","Hired_date":"10/1/2019"}
{"Emp_id":"10024","Emp_name":"Barbara, Thomas","Emp_location":"Seattle","Emp_dept":"HR","Emp_designation
pipeline_run_result = pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, pipeline_name, masterpipeline_raw_params)
# Trigger the Master Pipeline for Processed to Curated Zone
masterpipeline_curated_params = {
"lookUpTables": [{
"SourceTableSchemaName":"dbo",
"SourceTableName":table_processed,
"SinkTableSchemaName":"dbo",
"SinkTableName": table_curated,
"HasIncrementalData":"false"
}],
"sourceDatabase": database,
pipeline_run_result = pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, pipeline_name, masterpipeline_curated_params)
assert pipeline_run_result == constants.PIPELINE_SUCCESS_STATUS
# Check for Data in Curated Zone
cursor = sql_connection_client.cursor()
cursor.execute("SELECT COUNT(*) AS COUNT FROM [dbo].[{0}]".format(table_curated))
row = cursor.fetchone()
assert row is not None
@pytest.fixture(autouse=True)
def run_before_and_after_tests(adls_connection_client,
base_path: str,
container_name: str,
azure_credential,
synapse_endpoint):
yield
print("STARTING TO CLEAN UP .....")
adls.cleanup_ADLS_files(adls_connection_client,
container_name, CLEANUP_FOLDERS)
mimport pytest
import json
from dataconnectors import adls
from utils import pipelineutils, constants
CLEANUP_TABLES = []
CLEANUP_FOLDERS = []
database = "default"
def test_source_to_gold_workflow(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
storage_account_name: str,
container_name: str,
base_path: str,
input_sample_file_name: str,
adls_connection_client,
sql_connection_client):