Skip to content

Instantly share code, notes, and snippets.

@elexisvenator
Last active December 20, 2018 00:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elexisvenator/a26035ee2227fc5915608089cbce8b0e to your computer and use it in GitHub Desktop.
Save elexisvenator/a26035ee2227fc5915608089cbce8b0e to your computer and use it in GitHub Desktop.
Basic parts for generating load models from a csv list of tables/columns
public class DbtBaseTransformGenerator : IGenerator
{
private const string SourceSchema = "load";
public string Generate(ConfigurationModel model)
{
var sb = new StringBuilder();
AppendGeneratedRow(sb, model, -1, "NULL");
sb.AppendLine("UNION ALL");
AppendGeneratedRow(sb, model, -2, "UNKNOWN");
sb.AppendLine("UNION ALL");
sb.Append("SELECT ");
sb.Append(model.Columns.Select(c => c.Destination.Name).Aggregate((str, col) => str + ", " + col));
sb.AppendLine(", data_pipeline_change_version, data_pipeline_is_deleted, data_pipeline_timestamp");
sb.AppendFormat("FROM {0}.{1}", SourceSchema, model.LoadTable);
return sb.ToString();
}
private static void AppendGeneratedRow(StringBuilder sb, ConfigurationModel model, int rowId, string rowName)
{
sb.Append("SELECT ");
foreach (var column in model.Columns)
{
if (model.Columns.First() != column)
{
sb.Append(", ");
}
if (column.Destination.PrimaryKey)
{
sb.AppendFormat("{0}::INT AS {1}", rowId, column.Destination.Name);
continue;
}
if (column.Destination.ForeignKey)
{
sb.AppendFormat("{0}::INT AS {1}", rowId, column.Destination.Name);
continue;
}
if (column.Destination.Nullable)
{
switch (column.Destination.Type)
{
case DataType.@string:
sb.AppendFormat("null::CITEXT AS {0}", column.Destination.Name);
break;
case DataType.@int:
sb.AppendFormat("null::INT AS {0}", column.Destination.Name);
break;
case DataType.datetime:
sb.AppendFormat("null::TIMESTAMP AS {0}", column.Destination.Name);
break;
case DataType.numeric:
sb.AppendFormat("null::NUMERIC AS {0}", column.Destination.Name);
break;
case DataType.boolean:
sb.AppendFormat("null::BOOLEAN AS {0}", column.Destination.Name);
break;
case DataType.Unknown:
sb.AppendFormat("null::UNKNOWN AS {0}", column.Destination.Name);
break;
default:
throw new ArgumentOutOfRangeException(nameof(column.Destination.Type), column.Destination.Type, "Unsupported data type found.");
}
continue;
}
switch (column.Destination.Type)
{
case DataType.@string:
sb.AppendFormat("'{0}'::CITEXT AS {1}", rowName, column.Destination.Name);
break;
case DataType.@int:
sb.AppendFormat("0::INT AS {0}", column.Destination.Name);
break;
case DataType.datetime:
sb.AppendFormat("{{{{ default_timestamp() }}}} AS {0}", column.Destination.Name);
break;
case DataType.numeric:
sb.AppendFormat("0::NUMERIC AS {0}", column.Destination.Name);
break;
case DataType.boolean:
sb.AppendFormat("0::BOOLEAN AS {0}", column.Destination.Name);
break;
case DataType.Unknown:
sb.AppendFormat("'{0}'::UNKNOWN AS {0}", column.Destination.Name);
break;
default:
throw new ArgumentOutOfRangeException(nameof(column.Destination.Type), column.Destination.Type, "Unsupported data type found.");
}
}
sb.AppendLine(
@", null::BIGINT AS data_pipeline_change_version, 0::BOOLEAN AS data_pipeline_is_deleted, {{ default_timestamp() }} AS data_pipeline_timestamp");
}
public string GetFileName(ConfigurationModel model)
{
return $"base_{model.SourceTable.Name.ToLowerInvariant()}.sql";
}
public string GetFolderName()
{
return "transform\\models\\base";
}
}
@elexisvenator
Copy link
Author

elexisvenator commented Dec 20, 2018

The basic format of the csv is

TableName,ColumnName,IsNullable
Person,iPersonID,false
Person,sName,true
Person,iFriendID,false

Our source data naming conventions meant you could determine if a column was a primary key, foreign key and the columns datatype just from the column name. If your tables are named differently then add more columns to provide this information. (I am not a fan of this naming convention myself, it's just what we have.

The raw csv is converted to a list and grouped by table. Each table is fed into this generator class to create the sql model. (we have other generators for other parts of our pipeline). The generator then

  1. Adds a -1 (Null) row
  2. Adds a -2 (Unknown) row
  3. Unions a select query from the source table.

A macro is used to generate default timestamps, the macro looks as follows:

{% macro default_timestamp() %} to_timestamp('01 Jan 0000', 'DD Mon YYYY')::TIMESTAMP {% endmacro %}

The other thing you may notice is CITEXT, this is a custom data type which is a case-insensitive version of Postgres' TEXT type. This is because the source system is Mssql and reporting is used to case-insensitive queries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment