Last active
August 29, 2015 14:20
-
-
Save kynx/37429efa154531c67cc5 to your computer and use it in GitHub Desktop.
MySQL Proxy script for simultaneously populating tenant and master data warehouse
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ | |
MySQL Proxy script for ETL process in multi-tenant datawarehouse | |
@author Matt Kynaston <http://kynx.github.io> | |
@license MIT (http://opensource.org/licenses/MIT) | |
Requirement: | |
- One database per tenant | |
- Multi-tenant database containing all tenant data | |
- Surrogate keys are unique across all tenant databases | |
Solution: | |
- Intercept "SELECT MAX(id) FROM foo" queries and direct to multi-tenant | |
database so surrogate keys unique | |
- Other SELECTs sent to tenant database | |
- INSERT, UPDATE, DELETE sent to both tenant and multi-tenant databases | |
Conventions: | |
- Tenants have two databases, one for UAT and one for production | |
- Tenant UAT database names always end in 'test' | |
- Two multi-tenant databases are called dwproduction and dwstaging | |
- Tenant and multi-tenant databases are on same MySQL server | |
- Dimension lookup / updates MUST use "Use table maximum + 1" to | |
create technical key (ie no auto_increment) | |
Prepared statements: | |
- See http://dev.mysql.com/doc/internals/en/com-stmt-execute.html for | |
wire protocol - we're only interested in stmt_id (bytes 1-4) | |
MySQL Proxy Setup: | |
For performance, MySQL Proxy should run on the same machine as the MySQL server. | |
- $ sudo yum install mysql-proxy | |
- Edit /etc/mysql-proxy.cfg, editing the following lines: | |
proxy-lua-script=/path/to/mt-proxy.lua | |
proxy-backend-address=/var/lib/mysql/mysql.sock | |
- $ sudo chkconfig mysql-proxy on | |
- $ sudo service mysql-proxy start | |
- Ensure ETL scripts use port 4040 for connections to tenant DW | |
--]] | |
-- debug | |
local is_debug = false | |
-- current database, so we can reset it on error | |
local current_db = false | |
-- statement id prepared on multi-tenant db | |
local mt_stmt_id = '' | |
-- maps tenant prepared statements to corresponding multi-tenant statements | |
local stmt_ids = {} | |
-- | |
-- Returns name of multi-tenant database | |
-- | |
-- Edit this if your naming conventions differ | |
-- | |
function get_mt_db() | |
return string.sub(proxy.connection.client.default_db, -4) == "test" and "dwstaging" or "dwproduction" | |
end | |
-- | |
-- Analyses query, sending to correct db(s) | |
-- | |
function read_query( packet ) | |
local cmd_type = packet:byte() | |
if cmd_type == proxy.COM_STMT_EXECUTE or cmd_type == proxy.COM_STMT_CLOSE then | |
local stmt_id = string.sub(packet, 2, 5) | |
if is_debug then | |
print("[read_query]") | |
print(" db = " .. current_db) | |
print(" executing prepared stmt: " .. bytes_to_int32(stmt_id)) | |
end | |
-- execute / close prepared statement on both dbs | |
if stmt_ids[stmt_id] then | |
proxy.queries:append(2, string.char(cmd_type) .. stmt_ids[stmt_id] .. string.sub(packet, 6), { resultset_is_needed = true }) | |
proxy.queries:append(1, packet) | |
if cmd_type == proxy.COM_STMT_CLOSE then | |
stmt_ids[stmt_id] = nil | |
end | |
return proxy.PROXY_SEND_QUERY | |
end | |
-- fall through to execute on tenant | |
elseif cmd_type == proxy.COM_QUERY or cmd_type == proxy.COM_STMT_PREPARE then | |
local query = string.sub(packet, 2) or "" | |
current_db = proxy.connection.client.default_db | |
if is_debug then | |
print("[read_query]") | |
print(" db = " .. current_db) | |
print(" query = " .. query) | |
end | |
local f_s, f_e, command = string.find(string.gsub(query, "/%*.-%*/", ""), "^%s*(%w+)") | |
command = string.lower(command) | |
-- "SELECT MAX(id) FROM foo" goes to multi-tenant db | |
if string.find(string.lower(query), "^%s*select%s+max%(`?.*id`?%)%s+from%s+`?[%w_]*`?[%s;]*$") then | |
if is_debug then print(" sending query to " .. get_mt_db()) end | |
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. get_mt_db(), { resultset_is_needed = true }) | |
proxy.queries:append(1, packet) | |
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true }) | |
return proxy.PROXY_SEND_QUERY | |
-- DDL commands are too dangerous | |
elseif command == "create" or command == "alter" or command == "drop" or command == "rename" then | |
if is_debug then print(" illegal command " .. command) end | |
proxy.response.type = proxy.MYSQLD_PACKET_ERR | |
proxy.response.errmsg = "MT Proxy cannot handle DDL statements" | |
return proxy.PROXY_SEND_RESULT | |
-- prepared statements go to both dbs and need special handling | |
elseif cmd_type == proxy.COM_STMT_PREPARE and command ~= "select" then | |
if is_debug then print(" sending prepare to " .. get_mt_db() .. " and " .. current_db) end | |
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. get_mt_db(), { resultset_is_needed = true }) | |
proxy.queries:append(3, packet, { resultset_is_needed = true }) | |
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true }) | |
proxy.queries:append(4, packet, { resultset_is_needed = true }) | |
return proxy.PROXY_SEND_QUERY | |
-- data manipulation statements go to both | |
elseif command == "insert" or command == "update" or command == "delete" then | |
if is_debug then print(" sending query to " .. get_mt_db() .. " and " .. current_db) end | |
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. get_mt_db(), { resultset_is_needed = true }) | |
proxy.queries:append(2, packet, { resultset_is_needed = true }) | |
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true }) | |
proxy.queries:append(1, packet) | |
return proxy.PROXY_SEND_QUERY | |
-- everything else falls through and is executed on tenant db | |
elseif is_debug then | |
command = cmd_type == proxy.COM_STMT_PREPARE and "prepare" or command | |
print(" sending " .. command .. " to " .. current_db) | |
end | |
end | |
end | |
-- | |
-- Handles results of queries | |
-- | |
-- IDs are: | |
-- 1 - query run on tenant db: result goes straight to client | |
-- 2 - query run on multi-tenant db: check for errors | |
-- 3 - PREPARE run on multi-tenant db: capture stmt_id | |
-- 4 - PREPARE run on tenant db: map mt stmt_id, result goes to client | |
-- | |
function read_query_result(inj) | |
if inj.id ~= 1 then | |
if is_debug then | |
print("[read_query_result]") | |
print(" id: " .. inj.id) | |
print(" query_status: " .. inj.resultset.query_status) | |
end | |
-- if a query failed on the multi-tenant db reset database and return error | |
if inj.resultset.query_status == proxy.MYSQLD_PACKET_ERR then | |
if is_debug then print(" error - clearing queue") end | |
cancel_queries() | |
return | |
end | |
end | |
-- query from multi-tenant db | |
if inj.id == 2 then | |
if is_debug then print(" ok - ignoring") end | |
return proxy.PROXY_IGNORE_RESULT | |
-- query prepared on multi-tenant db | |
elseif inj.id == 3 then | |
if inj.resultset.raw:byte() == 0 then | |
local stmt_id = string.sub(inj.resultset.raw, 2, 5) | |
if is_debug then print(" got mt stmt id: " .. bytes_to_int32(stmt_id)) end | |
mt_stmt_id = stmt_id | |
return proxy.PROXY_IGNORE_RESULT | |
else | |
if is_debug then print(" invalid mt stmt - clearing queue") end | |
cancel_queries() | |
end | |
-- query prepared on tenant db | |
elseif inj.id == 4 then | |
if inj.resultset.raw:byte() == 0 and mt_stmt_id ~= '' then | |
local stmt_id = string.sub(inj.resultset.raw, 2, 5) | |
if is_debug then print(" got tenant stmt id: " .. bytes_to_int32(stmt_id)) end | |
-- map the multi-tenant statement to this one so we can execute it later | |
stmt_ids[stmt_id] = mt_stmt_id | |
mt_stmt_id = '' | |
else | |
if is_debug then print(" invalid tenant stmt - clearing queue") end | |
cancel_queries() | |
end | |
end | |
end | |
-- | |
-- Clears query queue, resets default database if required | |
-- | |
function cancel_queries() | |
-- stop queries going to tenant db | |
proxy.queries:reset() | |
-- switch back to tenant db | |
if current_db ~= proxy.connection.client.default_db then | |
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true }) | |
end | |
end | |
function bytes_to_int32(bytes) | |
return bytes:byte(4) + bytes:byte(3) * 256 + bytes:byte(2) * 65536 + bytes:byte(1) * 16777216 | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment