Skip to content

Instantly share code, notes, and snippets.

table_raw = "integrationtest_employees"
table_gold = "gold_integrationtest_employee"
# Trigger the Master Pipeline for Processed to gold Zone
masterpipeline_gold_params = {
"lookUpTables": [{
"SourceTableSchemaName": "dbo",
"SourceTableName": table_raw,
"SinkTableSchemaName": "dbo",
"SinkTableName": table_gold,
.
├── README.md
├── data
│ └── employee_integration_test.parquet
├── dataconnectors
│ └── adls.py
├── requirements.txt
├── tests
│ ├── conftest.py
│ └── test_pipeline.py
def test_source_to_processed_workflow(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
storage_account_name: str,
container_name: str,
base_path: str,
input_sample_file_name: str,
adls_connection_client,
sql_connection_client):
.
├── README.md
├── data
│ └── user.parquet
├── dataconnectors
│ └── adls.py
├── requirements.txt
├── tests
│ ├── conftest.py
│ └── test_pipeline.py
{"registration_dttm":28529000000000,"id":1,"first_name":"Amanda","last_name":"Jordan","email":"ajordan0@com.com","gender":"Female","ip_address":"1.197.201.2","cc":"6759521864920116","country":"Indonesia","birthdate":"3/8/1971","salary":49756.53,"title":"Internal Auditor","comments":"1E+02"}
{"registration_dttm":61443000000000,"id":2,"first_name":"Albert","last_name":"Freeman","email":"afreeman1@is.gd","gender":"Male","ip_address":"218.111.175.34","cc":"","country":"Canada","birthdate":"1/16/1968","salary":150280.17,"title":"Accountant IV","comments":""}
{"registration_dttm":4171000000000,"id":3,"first_name":"Evelyn","last_name":"Morgan","email":"emorgan2@altervista.org","gender":"Female","ip_address":"7.161.136.94","cc":"6767119071901597","country":"Russia","birthdate":"2/1/1960","salary":144972.51,"title":"Structural Engineer","comments":""}
{"registration_dttm":2181000000000,"id":4,"first_name":"Denise","last_name":"Riley","email":"driley3@gmpg.org","gender":"Female","ip_address":"140.35.109.83","cc":"35760
{"registration_dttm":28529000000000,"id":1,"first_name":"Amanda","last_name":"Jordan","email":"ajordan0@com.com","gender":"Female","ip_address":"1.197.201.2","cc":"6759521864920116","country":"Indonesia","birthdate":"3/8/1971","salary":49756.53,"title":"Internal Auditor","comments":"1E+02"}
{"registration_dttm":61443000000000,"id":2,"first_name":"Albert","last_name":"Freeman","email":"afreeman1@is.gd","gender":"Male","ip_address":"218.111.175.34","cc":"","country":"Canada","birthdate":"1/16/1968","salary":150280.17,"title":"Accountant IV","comments":""}
{"registration_dttm":4171000000000,"id":3,"first_name":"Evelyn","last_name":"Morgan","email":"emorgan2@altervista.org","gender":"Female","ip_address":"7.161.136.94","cc":"6767119071901597","country":"Russia","birthdate":"2/1/1960","salary":144972.51,"title":"Structural Engineer","comments":""}
{"registration_dttm":2181000000000,"id":4,"first_name":"Denise","last_name":"Riley","email":"driley3@gmpg.org","gender":"Female","ip_address":"140.35.109.83","cc":"35760
"basePath": base_path,
"filePath": "user_*.parquet",
"targetTable": target_table,
"badDataTable": "bad_users",
"containerName": container_name,
"archivePath": "archive",
"storageAccountName": storage_account_name,
"readFromSparkTables": False,
"database": database,
pipeline_run_result = pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, pipeline_name, masterpipeline_raw_params)
assert pipeline_run_result == constants.PIPELINE_SUCCESS_STATUS
# Check for Data in Raw Zone
parquet_dataframe = adls.read_parquet_file_from_ADLS(
adls_connection_client, container_name, target_path)
num_of_rows = len(parquet_dataframe.index)
# Assert
print(f"Number of Rows Fetched : { num_of_rows }\n")
assert num_of_rows >=1
"lookUpTables": [{
"SourceTableSchemaName": "dbo",
"SourceTableName": table_raw,
"SinkTableSchemaName": "dbo",
"SinkTableName": table_processed,
"HasIncrementalData": "false"
}],
"sourceDatabase": database,