Skip to content

Instantly share code, notes, and snippets.

@bruslim
Last active December 17, 2015 03:39
Show Gist options
  • Save bruslim/5544909 to your computer and use it in GitHub Desktop.
Save bruslim/5544909 to your computer and use it in GitHub Desktop.
quick and dirty nodejs powered tsql etl engine
// requires
var Mappr = require('./mappr.js');
var Tsql2008 = require('./tsql2008.js');
var config = Mappr.BuildConfig({
databases : {
// source db name
source: 'lidacdev',//'LiDACDev',
// desitnation db name
destination: 'lidac_data'
},
modes: {
// flag for test mode
sqlTest: false,
// number of passes for the import script to run
// typically 1 for production (insert and update), 2 for testing (insert then update)
passes: 1
},
options : {
// option to clear passwords (for testing purposes)
clearPasswords: true
}
});
// load all the tables in the order which they have been exported
var Tables = require('./tables.js');
for (var t in Tables) {
Tables[t].call(null, config);
}
// gen the sql script to the console
Tsql2008.GenerateSql(config);
/// node.js script
/// 2013-05-08 bruslim1@gmail.com
/// TSQL merge/select-insert ETL script generator
/// https://gist.github.com/bruslim/5544909
/// MAPPING OBJECT SCHEMA:
/*
var mappings = {
databases: {
source: 'SourceDbName',
destination: 'DestinationDbName'
},
tables: [
{
destination: 'DestinationTableName',
source: 'ViewName',
truncate: true,
killWhere: '',
columns: {
'DestinationColumnName1' : myMapper.DirectCopy({
sourceColumn: 'SourceColumnName'
}),
'DestinationColumnName2' : myMapper.ForeignKey({
sourceColumn: 'SourceIdColumnName',
parentTable: 'DestinationParentTableName',
parentColumn: 'DestinationParentIdColumnName',
parentType: 'char',
importColumn: 'orig_id',
importType: 'INT'
}),
'DestinationColumnName3' : myMapper.SqlFunction({
name: 'tempFunction',
params: [
'SourceColumnName',
'SourceColumnName',
'SourceColumnName',
'SourceColumnName'
]
})
}
}
]
};
*/
/// how to use
/*
1. create a file named etl.js
2. import mappr.js: Mappr = require('./mappr.js')
3. create your mappings object, following the schema above
4. call Mappr.GenerateSql(mappings,mappr);
5. via command line: node etl.js > out.sql
*/
var Crypto = require('crypto');
var Extend = require('node.extend');
function Config() {
var config = {
mappr: new exports.Mappr(),
databases: {},
options: {},
modes: {
sqlTest: false, // test mode
passes: 1 // number of passes
},
tables: []
};
config.useMapping = function(mapping, mapArguments) {
this.tables.push(mapping.Map(this, mapArguments));
};
return config;
};
exports.BuildConfig = function(source, destination, options) {
if (arguments.length == 0) {
throw 'Mappr.BuildConfig requires at least 1 parameter';
}
var config = new Config();
if (arguments.length > 1) {
config.databases.source = source;
config.databases.destination = destiantion;
config.options = options;
} else {
config = Extend(config,arguments[0]);
}
return config;
}
exports.Mappr = function() {
function GenLookupFunctionName(options) {
var temp = [
'dbo.get',
options.parentTable,
options.parentColumn,
'by',
options.importColumn
];
return temp.join('_');
};
function NormalizeOptions(options) {
var opt = Extend({
sourceColumn: '',
value: '',
sql: ''
}, options);
if (!options) return opt;
return opt;
}
var fkLookups = { };
var fkLookupsCount = 0;
var fkFunctionNames = [];
var self = {
CaseMap: function(options) {
var opts = NormalizeOptions(options);
opts.isColumn = true;
if (!opts.sourceColumn) { opts.sourceColumn = options; }
return {
options: opts,
transform: function(tableConfig) {
var qualifiedTarget = self.DirectCopy(options).transform.call(null, tableConfig);
var cases = [];
for(value in options.map) {
cases.push(
"CASE WHEN " + qualifiedTarget + " = " + self.RawValue(value).transform.call(null, tableConfig)
+ " THEN " + self.RawValue(options.map[value]).transform.call(null, tableConfig));
}
var ends = [];
for(var i =0; i < cases.length; i++) {
ends.push("END");
}
return cases.join(' ELSE ')
+ " ELSE " + self.RawValue(options['default']).transform.call(null, tableConfig)
+ ' ' + ends.join(' ');
}
};
},
MergeOn: function(options) {
var opts = NormalizeOptions(options);
var column = null;
var raw = null;
if (options.value !== undefined) {
if (options.value !== null) {
raw = '\'' + options.value + '\'';
}
} else {
column = '[' + (options.sourceColumn || options) + ']';
if (!opts.sourceColumn) { opts.sourceColumn = options; }
opts.isColumn = true;
}
return {
options: opts,
transform: function(tableConfig) {
return {
mergeOn: true,
column: column,
raw: raw
};
}
};
},
InvertBit: function(options) {
var opts = NormalizeOptions(options);
if (!opts.sourceColumn) { opts.sourceColumn = options; }
opts.isColumn = true;
return {
options: opts,
transform: function(tableConfig) {
var qualifiedTarget = self.DirectCopy(options).transform.call(null, tableConfig);
return 'CASE WHEN ' + qualifiedTarget + ' = 1 THEN 0 ELSE 1 END';
}
};
},
TruncateString: function(options) {
var opts = NormalizeOptions(options);
if (!opts.sourceColumn) { opts.sourceColumn = options; }
opts.isColumn = true;
return {
options: opts,
transform: function(tableConfig) {
var qualifiedTarget = self.DirectCopy(options).transform.call(null, tableConfig);
var truncated = 'RTRIM(LEFT(' + qualifiedTarget + ', ' + options.stringLength + '))';
return 'CASE WHEN LEN(' + truncated + ') = 0 THEN NULL ELSE ' + truncated + ' END';
}
};
},
ConcatColumns: function(options) {
var opts = NormalizeOptions(options);
if (!opts.sourceColumn) { opts.sourceColumn = options; }
opts.isColumns = true;
return {
options: opts,
transform: function(tableConfig) {
var select = '';
var joiner = ' + \'' + options.spacer + '\' + ';
for(var i in options.columns)
{
var qualifiedTarget = self.DirectCopy(options.columns[i]).transform.call(null, tableConfig);
select += 'COALESCE(' + qualifiedTarget + ', \'\')' + joiner;
}
var last = select.lastIndexOf(joiner);
select = select.substring(0,last);
return 'RTRIM(LTRIM(' + select.trim() + '))';
}
};
},
StaticHash: function(options) {
var hash = Crypto.createHash(options.algorithm || 'sha512');
hash.update(options.value || options);
var digest = hash.digest(options.encoding || 'base64');
return {
options: NormalizeOptions(options),
transform: function() {
return self.RawValue(digest).transform.apply();
}
};
},
DirectCopy: function(options) {
var opts = NormalizeOptions(options);
if (!opts.sourceColumn) { opts.sourceColumn = options; }
opts.isColumn = true;
return {
options: opts,
transform: function(tableConfig) {
var qualifiedTarget = '[' + (options.sourceColumn || options) + ']';
if(tableConfig && tableConfig.useMerge) {
qualifiedTarget = 'S.' + qualifiedTarget;
}
return qualifiedTarget;
}
};
},
CopyOrClear: function(options) {
var opts = NormalizeOptions(options);
if (!opts.sourceColumn) { opts.sourceColumn = options; }
opts.isColumn = true;
return {
options: opts,
transform: self.CopyOrDefault(opts).transform
}
},
CopyOrDefault: function(options) {
var opts = NormalizeOptions(options);
if (!opts.sourceColumn) { opts.sourceColumn = options; }
opts.isColumn = true;
return {
options: opts,
transform: function(tableConfig) {
var col = self.DirectCopy(options).transform.call(null, tableConfig);
return 'CASE WHEN LEN(' + col + ') = 0 OR ' + col + ' IS NULL THEN ' + self.RawValue(options.value).transform.call(null,tableConfig) +' ELSE ' + col + ' END';
}
};
},
ForeignKey: function(options) {
// creates a unique function which
// queries the destination table by the imported column name
// #getParentTableParentColumnByImportColumn
// select [parentColumn] from [parentTable] where [importColumn] = @value
var opts = NormalizeOptions(options);
opts.isColumn = true;
var fName = GenLookupFunctionName(options);
if (!fkLookups[fName]) {
fkLookupsCount++;
fkLookups[fName] = options;
}
return {
options: opts,
transform: function(tableConfig) {
return self.SqlFunction({
name: fName,
columns: [
options.sourceColumn
],
raws: (options.filterColumn ? [ options.filterValue ] : null)
}).transform.call(null, tableConfig);
}
};
},
SqlFunction: function(options) {
// creates the following select
// functionName([colName],[colName],[colName]...,[rawValue],[rawValue])
var opts = NormalizeOptions(options);
opts.isColumns = true;
return {
options: NormalizeOptions(options),
transform: function(tableConfig) {
var formatted = [];
for (var name in options.columns) {
formatted.push(self.DirectCopy(options.columns[name]).transform.call(null, tableConfig));
}
for (var raw in options.raws) {
formatted.push(self.RawValue(options.raws[raw]).transform.call(null, tableConfig));
}
if (options.name) {
return options.name + '(' + formatted.join(',') + ')';
}
return options + '()';
}
};
},
Sql: function(options) {
var opts = {};
if (!options.sql) {
opts.sql = options;
} else {
opts = options;
}
if (options.columns) {
opts.isColumns = true;
opts.columns = options.columns;
}
if (options.sourceColumn) {
opts.isColumn = true;
opts.sourceColumn = options.sourceColumn;
}
return {
options: opts,
transform: function(){
return opts.sql;
}
};
},
RawValue: function(options) {
return {
options: NormalizeOptions(options),
transform: function() {
if (options !== undefined && options !== null) {
return '\'' + (options.value || options) + '\'';
}
return 'NULL';
}
};
},
getFunctionCreateSql: function() {
// returns the t-sql to create the functions
var tempFuncs = [];
for(var fName in fkLookups) {
var opt = fkLookups[fName];
fkFunctionNames.push(fName);
var sql =
'CREATE FUNCTION ' + fName + '(@value AS ' + opt.importType + (opt.filterColumn ? ', @filter AS ' + opt.filterType :'') + ')\n'
+ 'RETURNS ' + opt.parentType + '\n'
+ 'BEGIN\n'
+ ' DECLARE @ret ' + opt.parentType + ';\n'
+ ' SELECT @ret = ' + opt.parentColumn + '\n'
+ ' FROM [' + opt.parentTable + ']\n'
+ ' WHERE [' + opt.importColumn + '] = @value'
+ (opt.filterColumn ? '\n AND [' + opt.filterColumn + '] = @filter;\n' : ';\n')
+ ' RETURN @ret;\n'
+ 'END;\n'
+ 'GO\n';
tempFuncs.push(sql);
}
return tempFuncs.join('\n');
},
getFunctionDropSql: function() {
var ret = '';
for(var i in fkFunctionNames) {
ret += 'DROP FUNCTION ' + fkFunctionNames[i] + ';\n'
}
return ret;
},
hasLookupFunctions: function() {
return fkLookupsCount > 0;
}
};
return self;
};
/// this file generates tsql compatiable with sqlserver2008+
'use strict';
var GetFullyQualifiedName = function(dbName,tblName) {
if (tblName.indexOf('#') > -1) { return tblName; }
return '[' + dbName + '].[dbo].[' + tblName + ']';
};
var GetFullyQualifiedDestination = function(config, tbl) {
return GetFullyQualifiedName(config.databases.destination, tbl.destination);
};
var GetFulllyQualifiedSource = function(config, tbl) {
return GetFullyQualifiedName(config.databases.source, tbl.source);
};
var SqlMsg = function(message) {
return "PRINT('" + message.replace(/'/g,"''") + "');";
};
var SqlComment = function(sql, isNotCommented) {
if (isNotCommented) { return sql; }
return '-- ' + sql;
};
var SqlSetup = (function(){
var Core = function(config) {
var mappr = config.mappr;
console.log(SqlMsg('======== BEGIN SETUP ========'));
console.log(SqlMsg(''));
console.log(SqlMsg('SETUP: Switching to [' + config.databases.destination + ']'));
console.log('USE [' + config.databases.destination + '];');
if(mappr.hasLookupFunctions()) {
console.log('-- BEGIN CREATE FUNCTIONS --\n');
console.log(SqlMsg('SETUP: Creating look-up functions'));
console.log(SqlMsg(''));
console.log('GO\n');
console.log(mappr.getFunctionCreateSql() + '\n');
console.log('-- END CREATE FUNCTIONS --\n');
}
var setupCmds = [];
for(var t in config.tables) {
var tbl = config.tables[t];
if (!tbl.setup || setupCmds.indexOf(tbl.setup) > -1) { continue; }
setupCmds.push(tbl.setup);
console.log(tbl.setup);
}
console.log('');
console.log(SqlMsg('========= END SETUP ========='));
console.log(SqlMsg(''));
};
var Testing = function(config) {
if (!config.modes.sqlTest) { return; }
console.log(SqlMsg('==================================================='));
console.log(SqlMsg('=!!!!! TEST MODE: TRANSACTION IS ROLLED BACK !!!!!='));
console.log(SqlMsg('==================================================='));
console.log(SqlMsg(''));
console.log('BEGIN TRANSACTION;');
};
var Cleanup = function(config) {
console.log('\n-- BEGIN CLEAN UP SQL --\n');
console.log(SqlMsg('======== BEGIN CLEANUP ========'));
console.log(SqlMsg(''));
var cmds = [];
// delete things in reverse order
for(var i = config.tables.length-1; i >= 0; --i) {
var tbl = config.tables[i],
fullDestination = GetFullyQualifiedDestination(config, tbl),
fullSource = GetFulllyQualifiedSource(config,tbl);
if (tbl.truncate && !tbl.killWhere) {
var cmd = 'TRUNCATE TABLE ' + fullDestination + ';';
if (cmds.indexOf(cmd) < 0) {
console.log(SqlComment(SqlMsg('INFO: Truncating table ' + fullDestination), !tbl.useMerge));
console.log(SqlComment(cmd, !tbl.useMerge));
console.log(SqlComment(SqlMsg(''), !tbl.useMerge));
cmds.push(cmd);
}
}
if (!tbl.truncate && tbl.killWhere) {
var cmd = 'DELETE FROM ' + fullDestination + ' WHERE ' + tbl.killWhere +';';
if (cmds.indexOf(cmd) < 0) {
console.log(SqlComment(SqlMsg('INFO: Deleting all records in ' + fullDestination + ' where ' + tbl.killWhere), !tbl.useMerge));
console.log(SqlComment(cmd, !tbl.useMerge));
console.log(SqlComment(SqlMsg(''), !tbl.useMerge));
cmds.push(cmd);
}
}
}
console.log(SqlMsg('========= END CLEANUP ========='));
console.log(SqlMsg(''));
console.log('\n-- END CLEAN UP SQL --\n');
};
return function(config) {
Core(config);
Testing(config);
Cleanup(config);
};
})();
var SqlTeardown = (function(){
var Core = function(config) {
if (!config.mappr.hasLookupFunctions()) { return; }
console.log('GO\n');
console.log('-- BEGIN DROP FUNCTIONS --\n');
console.log(SqlMsg('======== BEGIN TEARDOWN ========'));
console.log(SqlMsg(''));
console.log(SqlMsg('TEARDOWN: Dropping Look-up Functions'));
console.log(SqlMsg(''));
console.log('GO\n');
console.log(config.mappr.getFunctionDropSql());
var teardownCmds = [];
for(var t in config.tables) {
var tbl = config.tables[t];
if (!tbl.teardown || teardownCmds.indexOf(tbl.teardown) > -1) { continue; }
teardownCmds.push(tbl.teardown);
console.log(tbl.teardown);
}
console.log(SqlMsg('========= END TEARDOWN ========='));
console.log('-- END DROP FUNCTIONS --\n');
};
var Testing = function(config) {
if(!config.modes.sqlTest) { return; }
console.log(SqlMsg('==================================================='));
console.log(SqlMsg('=!!!!!! TEST MODE: ROLLING BACK TRANSACTION !!!!!!='));
console.log(SqlMsg('==================================================='));
console.log('rollback;');
console.log(SqlMsg(''));
};
return function(config) {
Testing(config);
Core(config);
};
})();
var SqlBody = (function() {
var MigrateTable = function(config, tbl) {
var processed = ProcessConfig(config,tbl)
if (tbl.useMerge) {
MigrateWithMerge(tbl, processed);
} else {
MigrateWithInsert(tbl, processed);
}
console.log(SqlMsg('') + '\n');
};
var ProcessConfig = function(config, tbl) {
var destColNames = [],
srcCols = [],
mergeOn = [],
groupbys = [];
// for each column name in tbl.columns
for(var colName in tbl.columns) {
// add the colName to the list
var destCol = '[' + colName + ']';
destColNames.push(destCol);
// source mapping
var srcMapping = tbl.columns[colName];
// transform the source
var srcCol = srcMapping.transform.call(null, tbl);
// handle merge
if (srcCol.mergeOn) {
if (srcCol.column) {
srcCols.push('S.' + srcCol.column);
mergeOn.push('D.' + destCol + ' = S.' + srcCol.column);
} else {
if (!!srcCol.raw) {
srcCols.push(srcCol.raw);
mergeOn.push('D.' + destCol + ' = ' + srcCol.raw);
} else {
srcCols.push('NULL');
mergeOn.push('D.' + destCol + ' IS NULL');
}
}
} else {
srcCols.push(srcCol);
}
// handle grouping for single column options
if (tbl.useGrouping && srcMapping.options.isColumn) {
var qualifedInner = '[' + srcMapping.options.sourceColumn + ']';
if (groupbys.indexOf(qualifedInner) < 0) {
groupbys.push(qualifedInner);
}
}
// handle groupingf for multi-column options
if (tbl.useGrouping && srcMapping.options.isColumns) {
var innerCols = [];
for(var ic in srcMapping.options.columns) {
var qualifedInner = '[' + srcMapping.options.columns[ic] + ']';
if (groupbys.indexOf(qualifedInner) < 0) {
groupbys.push(qualifedInner);
}
}
}
}
return {
fullDestination: GetFullyQualifiedDestination.call(null, config, tbl),
fullSource : GetFulllyQualifiedSource.call(null, config, tbl),
destColNames : destColNames,
srcCols : srcCols,
mergeOn : mergeOn,
groupBys : groupbys
}
};
var MigrateWithMerge = function(tbl, p) {
console.log(SqlMsg('==== Merging ' + p.fullDestination + ' with ' + p.fullSource + ' ====='));
if (tbl.message) {
console.log(SqlMsg('INFO: ' + tbl.message));
}
// begin sql MERGE
console.log('MERGE ' + p.fullDestination + ' AS D');
// begin sql MERGE USING clause
var using = 'USING ';
// force SELECT if needed
if (tbl.sourceFilter || tbl.useGrouping || tbl.forceDistinct) {
using += '(\n SELECT ';
// force distinct
if (tbl.forceDistinct) {
using += 'DISTINCT\n ';
}
// select group by cols or *
var groupByCols = p.groupBys.join(',\n ');
if (tbl.useGrouping) {
using += groupByCols
} else {
using += '*';
}
// select FROM
using += '\n FROM ' + p.fullSource;
// WHERE CLAUSE
if (tbl.sourceFilter) {
using += '\n WHERE ' + tbl.sourceFilter;
}
// group by clause
if (tbl.useGrouping) {
using += '\n GROUP BY ' + groupByCols;
}
using += '\n )';
} else {
using += p.fullSource;
}
using += ' AS S';
console.log(using);
console.log(' ON ' + p.mergeOn.join('\n AND '));
var sets = [],
inserts = [];
for(var i in p.destColNames) {
// do not update pks!
if (p.destColNames[i] != '[' + tbl.mergePk + ']') {
sets.push(' ' + p.destColNames[i] + ' = ' + p.srcCols[i]);
}
inserts.push('Inserted.' + p.destColNames[i]);
}
if (!tbl.insertOnly) {
console.log('WHEN MATCHED THEN');
console.log(' UPDATE SET')
console.log(sets.join(',\n'));
}
if (!tbl.updateOnly) {
console.log('WHEN NOT MATCHED THEN');
console.log(' INSERT (\n ' + p.destColNames.join(',\n ') + '\n ) VALUES (');
console.log(' ' + p.srcCols.join(',\n ') + '\n )');
}
console.log('OUTPUT $action,\n ' + inserts.join(',\n ') + ';');
};
var MigrateWithInsert = function(tbl, p) {
console.log(SqlMsg('==== Inserting into ' + p.fullDestination + ' with ' + p.fullSource + ' ====='));
if (tbl.message) {
console.log(SqlMsg('MSG: ' + tbl.message));
}
var sql = 'INSERT INTO ' + p.fullDestination + ' (\n';
sql += p.destColNames.join(',\n');
sql += '\n) SELECT '
if (tbl.forceDistinct) {
sql += 'DISTINCT\n ';
} else {
sql += '\n' + p.srcCols.join(',\n');
}
sql += '\nFROM ' + p.fullSource;
if (tbl.sourceFilter) {
sql += '\nWHERE ' + tbl.sourceFilter;
}
if (tbl.useGrouping) {
sql += '\nGROUP BY ' + p.groupBys.join(', ');
}
console.log(sql + ';\n');
};
return function(config) {
console.log('\n-- BEGIN INSERT-SELECT SQL --\n');
for(var pass = 0; pass < config.modes.passes; pass++) {
// for each table name in config.tables
console.log(SqlMsg('======== BEGIN PASS ' + (pass + 1) + ' of ' + config.modes.passes + ' ========'));
console.log(SqlMsg(''));
for(var i in config.tables) {
MigrateTable(config, config.tables[i]);
}
console.log(SqlMsg('======= END OF PASS ' + (pass + 1) + ' of ' + config.modes.passes + ' ========'));
console.log(SqlMsg(''));
}
console.log('-- END INSERT-SELECT SQL -- \n');
};
})();
exports.GenerateSql = function(config) {
SqlSetup(config);
SqlBody(config);
SqlTeardown(config);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment