Skip to content

Instantly share code, notes, and snippets.

.
├── README.md
├── data
│ └── user.parquet
├── dataconnectors
│ └── adls.py
├── requirements.txt
├── tests
│ ├── conftest.py
│ └── test_pipeline.py
def test_source_to_processed_workflow(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
storage_account_name: str,
container_name: str,
base_path: str,
input_sample_file_name: str,
adls_connection_client,
sql_connection_client):
.
├── README.md
├── data
│ └── employee_integration_test.parquet
├── dataconnectors
│ └── adls.py
├── requirements.txt
├── tests
│ ├── conftest.py
│ └── test_pipeline.py
table_raw = "integrationtest_employees"
table_gold = "gold_integrationtest_employee"
# Trigger the Master Pipeline for Processed to gold Zone
masterpipeline_gold_params = {
"lookUpTables": [{
"SourceTableSchemaName": "dbo",
"SourceTableName": table_raw,
"SinkTableSchemaName": "dbo",
"SinkTableName": table_gold,
def test_source_to_gold_workflow(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
storage_account_name: str,
container_name: str,
base_path: str,
input_sample_file_name: str,
adls_connection_client,
sql_connection_client):
mimport pytest
import json
from dataconnectors import adls
from utils import pipelineutils, constants
CLEANUP_TABLES = []
CLEANUP_FOLDERS = []
database = "default"
@pytest.fixture(autouse=True)
def run_before_and_after_tests(adls_connection_client,
base_path: str,
container_name: str,
azure_credential,
synapse_endpoint):
yield
print("STARTING TO CLEAN UP .....")
adls.cleanup_ADLS_files(adls_connection_client,
container_name, CLEANUP_FOLDERS)
pipeline_run_result = pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, pipeline_name, masterpipeline_curated_params)
assert pipeline_run_result == constants.PIPELINE_SUCCESS_STATUS
# Check for Data in Curated Zone
cursor = sql_connection_client.cursor()
cursor.execute("SELECT COUNT(*) AS COUNT FROM [dbo].[{0}]".format(table_curated))
row = cursor.fetchone()
assert row is not None
# Trigger the Master Pipeline for Processed to Curated Zone
masterpipeline_curated_params = {
"lookUpTables": [{
"SourceTableSchemaName":"dbo",
"SourceTableName":table_processed,
"SinkTableSchemaName":"dbo",
"SinkTableName": table_curated,
"HasIncrementalData":"false"
}],
"sourceDatabase": database,
pipeline_run_result = pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, pipeline_name, masterpipeline_raw_params)