Skip to content

Instantly share code, notes, and snippets.

@caligo-mentis
Created March 3, 2016 09:03
Show Gist options
  • Save caligo-mentis/b3af40edb81d586b967d to your computer and use it in GitHub Desktop.
Save caligo-mentis/b3af40edb81d586b967d to your computer and use it in GitHub Desktop.
Parallel Queries
var seq = require('nd-seq');
var utils = require('nd-utils');
var mysql = require('mysql');
var map = utils.map;
var identity = utils.identity;
var ensureFunction = utils.ensureFunction;
var config = {
socketPath: '/tmp/mysql.sock',
user: 'noodoo',
database: 'noodoo_test'
};
var pool = mysql.createPool(config);
var statements = [
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 1')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 1.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 1.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 1.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 2')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 2.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 2.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 2.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 3')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 3.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 3.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 3.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 4')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 4.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 4.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 4.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 5')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 5.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 5.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 5.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 6')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 6.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 6.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 6.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 7')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 7.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 7.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 7.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 8')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 8.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 8.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 8.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 9')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 9.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 9.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 9.3', " + ids[0] + ")";
}
],
[
"INSERT `projects` (`projects`.`title`) VALUES ('Project 10')",
function(ids) {
return "DELETE FROM `tasks` WHERE `tasks`.`project_id` = " + ids[0];
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 10.1', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 10.2', " + ids[0] + ")";
},
function(ids) {
return "INSERT `tasks` (`tasks`.`title`, `tasks`.`project_id`) VALUES ('Task 10.3', " + ids[0] + ")";
}
]
];
var inserts = map(statements, query);
seq(inserts, function(err) {
if (err) console.log(err);
console.log('Done');
process.exit(0);
});
function query(statements) {
return function(next) {
transaction(function(err, connection) {
if (err) return next(err);
var ids = [];
var queue = map(statements, invoke);
queue.push(commit);
seq.apply(this, queue);
function invoke(statement, index) {
return function(next) {
statement = ensureFunction(statement, identity);
var query = statement(ids);
connection.query(query, function(err, result) {
if (err) {
console.log('Error statement', query);
return next(err);
}
ids[index] = result.insertId;
next();
});
};
}
function commit(err) {
if (err) return rollback(err);
connection.query('COMMIT;', function(err) {
if (err) return rollback(err);
next();
});
}
function rollback(err) {
connection.query('ROLLBACK;', function() {
next(err);
});
}
});
};
}
function transaction(next) {
pool.getConnection(function(err, connection) {
if (err) return next(err);
connection.query('START TRANSACTION;', function(err) {
if (err) return next(err);
next(undefined, connection);
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment