Skip to content

Instantly share code, notes, and snippets.

@kynx
Last active August 29, 2015 14:20
Show Gist options
  • Save kynx/37429efa154531c67cc5 to your computer and use it in GitHub Desktop.
Save kynx/37429efa154531c67cc5 to your computer and use it in GitHub Desktop.
MySQL Proxy script for simultaneously populating tenant and master data warehouse
--[[
MySQL Proxy script for ETL process in multi-tenant datawarehouse
@author Matt Kynaston <http://kynx.github.io>
@license MIT (http://opensource.org/licenses/MIT)
Requirement:
- One database per tenant
- Multi-tenant database containing all tenant data
- Surrogate keys are unique across all tenant databases
Solution:
- Intercept "SELECT MAX(id) FROM foo" queries and direct to multi-tenant
database so surrogate keys unique
- Other SELECTs sent to tenant database
- INSERT, UPDATE, DELETE sent to both tenant and multi-tenant databases
Conventions:
- Tenants have two databases, one for UAT and one for production
- Tenant UAT database names always end in 'test'
- Two multi-tenant databases are called dwproduction and dwstaging
- Tenant and multi-tenant databases are on same MySQL server
- Dimension lookup / updates MUST use "Use table maximum + 1" to
create technical key (ie no auto_increment)
Prepared statements:
- See http://dev.mysql.com/doc/internals/en/com-stmt-execute.html for
wire protocol - we're only interested in stmt_id (bytes 1-4)
MySQL Proxy Setup:
For performance, MySQL Proxy should run on the same machine as the MySQL server.
- $ sudo yum install mysql-proxy
- Edit /etc/mysql-proxy.cfg, editing the following lines:
proxy-lua-script=/path/to/mt-proxy.lua
proxy-backend-address=/var/lib/mysql/mysql.sock
- $ sudo chkconfig mysql-proxy on
- $ sudo service mysql-proxy start
- Ensure ETL scripts use port 4040 for connections to tenant DW
--]]
-- debug
local is_debug = false
-- current database, so we can reset it on error
local current_db = false
-- statement id prepared on multi-tenant db
local mt_stmt_id = ''
-- maps tenant prepared statements to corresponding multi-tenant statements
local stmt_ids = {}
--
-- Returns name of multi-tenant database
--
-- Edit this if your naming conventions differ
--
function get_mt_db()
return string.sub(proxy.connection.client.default_db, -4) == "test" and "dwstaging" or "dwproduction"
end
--
-- Analyses query, sending to correct db(s)
--
function read_query( packet )
local cmd_type = packet:byte()
if cmd_type == proxy.COM_STMT_EXECUTE or cmd_type == proxy.COM_STMT_CLOSE then
local stmt_id = string.sub(packet, 2, 5)
if is_debug then
print("[read_query]")
print(" db = " .. current_db)
print(" executing prepared stmt: " .. bytes_to_int32(stmt_id))
end
-- execute / close prepared statement on both dbs
if stmt_ids[stmt_id] then
proxy.queries:append(2, string.char(cmd_type) .. stmt_ids[stmt_id] .. string.sub(packet, 6), { resultset_is_needed = true })
proxy.queries:append(1, packet)
if cmd_type == proxy.COM_STMT_CLOSE then
stmt_ids[stmt_id] = nil
end
return proxy.PROXY_SEND_QUERY
end
-- fall through to execute on tenant
elseif cmd_type == proxy.COM_QUERY or cmd_type == proxy.COM_STMT_PREPARE then
local query = string.sub(packet, 2) or ""
current_db = proxy.connection.client.default_db
if is_debug then
print("[read_query]")
print(" db = " .. current_db)
print(" query = " .. query)
end
local f_s, f_e, command = string.find(string.gsub(query, "/%*.-%*/", ""), "^%s*(%w+)")
command = string.lower(command)
-- "SELECT MAX(id) FROM foo" goes to multi-tenant db
if string.find(string.lower(query), "^%s*select%s+max%(`?.*id`?%)%s+from%s+`?[%w_]*`?[%s;]*$") then
if is_debug then print(" sending query to " .. get_mt_db()) end
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. get_mt_db(), { resultset_is_needed = true })
proxy.queries:append(1, packet)
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true })
return proxy.PROXY_SEND_QUERY
-- DDL commands are too dangerous
elseif command == "create" or command == "alter" or command == "drop" or command == "rename" then
if is_debug then print(" illegal command " .. command) end
proxy.response.type = proxy.MYSQLD_PACKET_ERR
proxy.response.errmsg = "MT Proxy cannot handle DDL statements"
return proxy.PROXY_SEND_RESULT
-- prepared statements go to both dbs and need special handling
elseif cmd_type == proxy.COM_STMT_PREPARE and command ~= "select" then
if is_debug then print(" sending prepare to " .. get_mt_db() .. " and " .. current_db) end
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. get_mt_db(), { resultset_is_needed = true })
proxy.queries:append(3, packet, { resultset_is_needed = true })
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true })
proxy.queries:append(4, packet, { resultset_is_needed = true })
return proxy.PROXY_SEND_QUERY
-- data manipulation statements go to both
elseif command == "insert" or command == "update" or command == "delete" then
if is_debug then print(" sending query to " .. get_mt_db() .. " and " .. current_db) end
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. get_mt_db(), { resultset_is_needed = true })
proxy.queries:append(2, packet, { resultset_is_needed = true })
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true })
proxy.queries:append(1, packet)
return proxy.PROXY_SEND_QUERY
-- everything else falls through and is executed on tenant db
elseif is_debug then
command = cmd_type == proxy.COM_STMT_PREPARE and "prepare" or command
print(" sending " .. command .. " to " .. current_db)
end
end
end
--
-- Handles results of queries
--
-- IDs are:
-- 1 - query run on tenant db: result goes straight to client
-- 2 - query run on multi-tenant db: check for errors
-- 3 - PREPARE run on multi-tenant db: capture stmt_id
-- 4 - PREPARE run on tenant db: map mt stmt_id, result goes to client
--
function read_query_result(inj)
if inj.id ~= 1 then
if is_debug then
print("[read_query_result]")
print(" id: " .. inj.id)
print(" query_status: " .. inj.resultset.query_status)
end
-- if a query failed on the multi-tenant db reset database and return error
if inj.resultset.query_status == proxy.MYSQLD_PACKET_ERR then
if is_debug then print(" error - clearing queue") end
cancel_queries()
return
end
end
-- query from multi-tenant db
if inj.id == 2 then
if is_debug then print(" ok - ignoring") end
return proxy.PROXY_IGNORE_RESULT
-- query prepared on multi-tenant db
elseif inj.id == 3 then
if inj.resultset.raw:byte() == 0 then
local stmt_id = string.sub(inj.resultset.raw, 2, 5)
if is_debug then print(" got mt stmt id: " .. bytes_to_int32(stmt_id)) end
mt_stmt_id = stmt_id
return proxy.PROXY_IGNORE_RESULT
else
if is_debug then print(" invalid mt stmt - clearing queue") end
cancel_queries()
end
-- query prepared on tenant db
elseif inj.id == 4 then
if inj.resultset.raw:byte() == 0 and mt_stmt_id ~= '' then
local stmt_id = string.sub(inj.resultset.raw, 2, 5)
if is_debug then print(" got tenant stmt id: " .. bytes_to_int32(stmt_id)) end
-- map the multi-tenant statement to this one so we can execute it later
stmt_ids[stmt_id] = mt_stmt_id
mt_stmt_id = ''
else
if is_debug then print(" invalid tenant stmt - clearing queue") end
cancel_queries()
end
end
end
--
-- Clears query queue, resets default database if required
--
function cancel_queries()
-- stop queries going to tenant db
proxy.queries:reset()
-- switch back to tenant db
if current_db ~= proxy.connection.client.default_db then
proxy.queries:append(2, string.char(proxy.COM_INIT_DB) .. current_db, { resultset_is_needed = true })
end
end
function bytes_to_int32(bytes)
return bytes:byte(4) + bytes:byte(3) * 256 + bytes:byte(2) * 65536 + bytes:byte(1) * 16777216
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment