Skip to content

Instantly share code, notes, and snippets.

@mlongoria
Last active March 10, 2017 19:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mlongoria/7931f42c914afb8c85c4c85c1d7ec964 to your computer and use it in GitHub Desktop.
Save mlongoria/7931f42c914afb8c85c4c85c1d7ec964 to your computer and use it in GitHub Desktop.
Biml ADF Pipeline Generation
<#@ import namespace="System.Data" #>
<#@ import namespace="System.Text" #>
<#@ property name="targetTables" type="DataView"#>
<#@ property name="frequency" type="string"#>
<#@ property name="scope" type="string"#>
{
"$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
"name": "PL_Copy_MySourceDBToADLS_<#=frequency#>_<#=scope#>",
"properties": {
"description": "<#=frequency#> <#=scope#> copies of data from Source db to the data lake.",
"activities": [
<# var isFirst = true; foreach( DataRowView rowView in targetTables) {#>
<# DataRow row = rowView.Row; #>
<# string schemaName = row["SchemaName"].ToString();#>
<# string tableName = row["TableName"].ToString();#>
<# string columnList = row["ColumnListForSelect"].ToString(); #>
<# string predicate = row["IncrementalPredicate"].ToString(); #>
<#=isFirst ? "" : ","#>
{
"name": "Copy to Lake - <#=schemaName#>.<#=tableName#>",
"type": "Copy",
"inputs": [
{
"name": "DS_OnPremSQL_MySourceDB_<#=schemaName#>_<#=tableName#>"
}
],
"outputs": [
{
"name": "DS_DataLake_MySourceDB_<#=schemaName#>_<#=tableName#>"
}
],
"typeProperties": {
"source": {
"type": "SqlSource",
<# if (scope == "Full") {#>
"sqlReaderQuery": "SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>]"
<#} else if (scope == "Deltas" && frequency == "Hourly") {#>
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', Time.AddHours(WindowStart, -5), Time.AddHours(WindowEnd, -5))"
<#} else if (scope == "Deltas" && frequency == "Daily") {#>
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', WindowStart, WindowEnd)"
<# } #>
},
"sink": {
"type": "AzureDataLakeStoreSink"
}
},
"policy": {
"concurrency": 1,
"executionPriorityOrder": "OldestFirst",
"retry": 3,
"timeout": "01:00:00"
},
"scheduler": {
<# if (frequency == "Daily") {#>
"frequency": "Day",
"offset": "09:00:00",
<#} else if (frequency == "Hourly") {#>
"frequency": "Hour",
<# } #>
"style": "EndOfInterval",
"interval": 1
}
}
<# isFirst = false; }#>
],
<# if (frequency == "Hourly") {#>
"start": "2017-03-01T01:00:00",
<#}else {#>
"start": "2017-03-02T00:00:00",
<#}#>
"end": "9999-09-09"
}
}
<#@ template tier="10" #>
<#@ import namespace="System.Data" #>
<#@ import namespace="System.Text" #>
<#@ code file="BGHelper.cs" #>
<#@ import namespace="BGHelper" #>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
</Biml>
<#
string mdFilePath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\metadata";
string mdFileName = "TargetTableMetadata.xlsx";
string mdWorkSheetName = "Metadata$";
bool mdHasHeader = true;
string logPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\generate_data_factory_biml\\log.txt";
string adfProjPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\data_factory\\";
DataSet ds = new DataSet();
ds = ExcelReader.ReadExcelQuery(mdFilePath, mdFileName, mdWorkSheetName, mdHasHeader);
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.Path.Combine(mdFilePath, mdFileName).ToString() +Environment.NewLine );
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.File.Exists(System.IO.Path.Combine(mdFilePath, mdFileName)).ToString() +Environment.NewLine );
System.IO.File.AppendAllText(@logPath, "Dataset table count: " + ds.Tables.Count.ToString() + Environment.NewLine);
DataView dailyFulls = new DataView(ds.Tables["Metadata"],"Frequency = 'Daily' and [Changes Only] = 'No'","", DataViewRowState.CurrentRows);
DataView dailyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Daily' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows);
DataView hourlyFulls = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'No'", "", DataViewRowState.CurrentRows);
DataView hourlyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows);
//log count of results for each filter
System.IO.File.AppendAllText(@logPath, "Daily Fulls Count: " + dailyFulls.Count.ToString() + Environment.NewLine);
System.IO.File.AppendAllText(@logPath, "Daily Deltas Count: " + dailyDeltas.Count.ToString() + Environment.NewLine);
System.IO.File.AppendAllText(@logPath, "Hourly Fulls Count: " + hourlyFulls.Count.ToString() + Environment.NewLine);
System.IO.File.AppendAllText(@logPath, "Hourly Deltas Count: " + hourlyDeltas.Count.ToString() + Environment.NewLine);
//Generate datasets
foreach (DataRowView rowView in dailyFulls)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "full";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
foreach (DataRowView rowView in dailyDeltas)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "deltas";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
foreach (DataRowView rowView in hourlyFulls)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "full";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
foreach (DataRowView rowView in hourlyDeltas)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "deltas";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
// Generate pipelines
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyFulls, "Daily", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyDeltas, "Daily", "Deltas"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyFulls, "Hourly", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyDeltas, "Hourly", "Deltas"));
//Generate One-Time Pipelines
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyFulls, "Daily", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyDeltas, "Daily", "Deltas"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyFulls, "Hourly", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyDeltas, "Hourly", "Deltas"));
#>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment