Biml ADF Pipeline Generation
<#@ import namespace="System.Data" #> | |
<#@ import namespace="System.Text" #> | |
<#@ property name="targetTables" type="DataView"#> | |
<#@ property name="frequency" type="string"#> | |
<#@ property name="scope" type="string"#> | |
{ | |
"$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json", | |
"name": "PL_Copy_MySourceDBToADLS_<#=frequency#>_<#=scope#>", | |
"properties": { | |
"description": "<#=frequency#> <#=scope#> copies of data from Source db to the data lake.", | |
"activities": [ | |
<# var isFirst = true; foreach( DataRowView rowView in targetTables) {#> | |
<# DataRow row = rowView.Row; #> | |
<# string schemaName = row["SchemaName"].ToString();#> | |
<# string tableName = row["TableName"].ToString();#> | |
<# string columnList = row["ColumnListForSelect"].ToString(); #> | |
<# string predicate = row["IncrementalPredicate"].ToString(); #> | |
<#=isFirst ? "" : ","#> | |
{ | |
"name": "Copy to Lake - <#=schemaName#>.<#=tableName#>", | |
"type": "Copy", | |
"inputs": [ | |
{ | |
"name": "DS_OnPremSQL_MySourceDB_<#=schemaName#>_<#=tableName#>" | |
} | |
], | |
"outputs": [ | |
{ | |
"name": "DS_DataLake_MySourceDB_<#=schemaName#>_<#=tableName#>" | |
} | |
], | |
"typeProperties": { | |
"source": { | |
"type": "SqlSource", | |
<# if (scope == "Full") {#> | |
"sqlReaderQuery": "SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>]" | |
<#} else if (scope == "Deltas" && frequency == "Hourly") {#> | |
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', Time.AddHours(WindowStart, -5), Time.AddHours(WindowEnd, -5))" | |
<#} else if (scope == "Deltas" && frequency == "Daily") {#> | |
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', WindowStart, WindowEnd)" | |
<# } #> | |
}, | |
"sink": { | |
"type": "AzureDataLakeStoreSink" | |
} | |
}, | |
"policy": { | |
"concurrency": 1, | |
"executionPriorityOrder": "OldestFirst", | |
"retry": 3, | |
"timeout": "01:00:00" | |
}, | |
"scheduler": { | |
<# if (frequency == "Daily") {#> | |
"frequency": "Day", | |
"offset": "09:00:00", | |
<#} else if (frequency == "Hourly") {#> | |
"frequency": "Hour", | |
<# } #> | |
"style": "EndOfInterval", | |
"interval": 1 | |
} | |
} | |
<# isFirst = false; }#> | |
], | |
<# if (frequency == "Hourly") {#> | |
"start": "2017-03-01T01:00:00", | |
<#}else {#> | |
"start": "2017-03-02T00:00:00", | |
<#}#> | |
"end": "9999-09-09" | |
} | |
} |
<#@ template tier="10" #> | |
<#@ import namespace="System.Data" #> | |
<#@ import namespace="System.Text" #> | |
<#@ code file="BGHelper.cs" #> | |
<#@ import namespace="BGHelper" #> | |
<Biml xmlns="http://schemas.varigence.com/biml.xsd"> | |
</Biml> | |
<# | |
string mdFilePath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\metadata"; | |
string mdFileName = "TargetTableMetadata.xlsx"; | |
string mdWorkSheetName = "Metadata$"; | |
bool mdHasHeader = true; | |
string logPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\generate_data_factory_biml\\log.txt"; | |
string adfProjPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\data_factory\\"; | |
DataSet ds = new DataSet(); | |
ds = ExcelReader.ReadExcelQuery(mdFilePath, mdFileName, mdWorkSheetName, mdHasHeader); | |
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.Path.Combine(mdFilePath, mdFileName).ToString() +Environment.NewLine ); | |
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.File.Exists(System.IO.Path.Combine(mdFilePath, mdFileName)).ToString() +Environment.NewLine ); | |
System.IO.File.AppendAllText(@logPath, "Dataset table count: " + ds.Tables.Count.ToString() + Environment.NewLine); | |
DataView dailyFulls = new DataView(ds.Tables["Metadata"],"Frequency = 'Daily' and [Changes Only] = 'No'","", DataViewRowState.CurrentRows); | |
DataView dailyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Daily' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows); | |
DataView hourlyFulls = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'No'", "", DataViewRowState.CurrentRows); | |
DataView hourlyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows); | |
//log count of results for each filter | |
System.IO.File.AppendAllText(@logPath, "Daily Fulls Count: " + dailyFulls.Count.ToString() + Environment.NewLine); | |
System.IO.File.AppendAllText(@logPath, "Daily Deltas Count: " + dailyDeltas.Count.ToString() + Environment.NewLine); | |
System.IO.File.AppendAllText(@logPath, "Hourly Fulls Count: " + hourlyFulls.Count.ToString() + Environment.NewLine); | |
System.IO.File.AppendAllText(@logPath, "Hourly Deltas Count: " + hourlyDeltas.Count.ToString() + Environment.NewLine); | |
//Generate datasets | |
foreach (DataRowView rowView in dailyFulls) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "full"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
foreach (DataRowView rowView in dailyDeltas) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "deltas"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
foreach (DataRowView rowView in hourlyFulls) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "full"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
foreach (DataRowView rowView in hourlyDeltas) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "deltas"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
// Generate pipelines | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyFulls, "Daily", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyDeltas, "Daily", "Deltas")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyFulls, "Hourly", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyDeltas, "Hourly", "Deltas")); | |
//Generate One-Time Pipelines | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyFulls, "Daily", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyDeltas, "Daily", "Deltas")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyFulls, "Hourly", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyDeltas, "Hourly", "Deltas")); | |
#> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment