Last active
October 10, 2015 02:04
-
-
Save greenlion/5c50f5bfacde484d903b to your computer and use it in GitHub Desktop.
MariaDB-TroySQL OLAP plugin interface
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
save_client_capabilities = thd->client_capabilities; | |
thd->client_capabilities |= CLIENT_MULTI_QUERIES; | |
save_vio = thd->net.vio; | |
thd->net.vio = 0; | |
for(table in tables) { | |
dispatch_command(COM_QUERY, thd, buf, len); | |
} | |
thd->client_capabilities = save_client_capabilties; | |
thd->net.vio = save_vio; | |
-- STORED PROC HELPER | |
delimiter ;; | |
create database if not exists shard_query;; | |
use shard_query;; | |
drop procedure if exists sq_helper;; | |
drop procedure if exists sq_throw;; | |
create procedure sq_throw(v_err text) | |
begin | |
signal sqlstate '45000' set | |
class_origin = 'Fastbit_UDF', | |
subclass_origin = 'Fastbit_UDF', | |
mysql_errno = 45000, | |
message_text = v_err; | |
end;; | |
create procedure sq_helper(v_sql longtext, v_remote_schema varchar(64), v_into_schema varchar(64), v_into_table varchar(64), v_return_result tinyint, v_drop_table tinyint) | |
proc:begin | |
-- query to send to SQ formatted as JSON | |
declare v_args longtext; | |
-- JSON returned from SQ | |
declare v_json longtext; | |
-- where to put the rows after decoding the JSON | |
declare v_table text; | |
-- JSON string extracted from v_json which represents list of columns | |
-- in the resultset from SQ | |
declare v_columns text; | |
-- the VALUES clause for the insert into v_table | |
declare v_values text; | |
-- used for finding parts of the JSON | |
declare v_pos int; | |
declare v_pos2 int; | |
-- use the SQL default schema if no schema provided | |
-- this is the only parameter that may be NULL and | |
-- since this makes it not null, it will pass the | |
-- check below | |
if v_remote_schema IS NULL | |
then | |
set v_remote_schema := ''; | |
end if; | |
-- check args | |
set v_table := concat('`',v_into_schema,'`.`',v_into_table,'`'); | |
if v_sql is null or v_table is null or v_remote_schema is null or v_into_schema is null or v_into_table is null or v_return_result is null or v_drop_table is null | |
then | |
call sq_throw('All arguments are NOT NULL'); | |
end if; | |
set v_args := concat('{"sql":"', v_sql, '","schema_name":"',v_remote_schema,'"}'); | |
set v_json := gman_do('shard_query_worker', v_args); | |
-- Extract the JSON arrays from the document | |
set v_pos := locate("[", v_json); | |
set v_pos2 := locate("]", v_json); | |
set v_columns := substr(v_json, v_pos+1, v_pos2-v_pos-1); | |
-- clean up column list | |
set v_columns := replace(v_columns, '\\"', '"'); | |
-- extract rows | |
set v_pos := locate('[[',v_json); | |
set v_pos2 := locate(']]',v_json, v_pos+2); | |
-- the column json has a very simple structure. The easiest thing to do is simply | |
-- get rid of the extra JSON with REPLACE() and generate the CREATE TABLE with | |
-- the column names that remain | |
set @create := concat('create table temporary',v_table, '(', replace(replace(replace(v_columns, '{"type":250,"name":"',''),'"}',''),',',' text,'),' text)'); | |
-- generate the INSERT statement | |
set @insert:= concat('insert into ', v_table, ' values ', replace( replace( replace( replace( replace( substr(v_json,v_pos+1,v_pos2-v_pos),'[\\','[') ,'\\",\\"','","') ,'\\"]','"]') ,'[' ,'(') ,']',')')); | |
if(@sqdebug is not null) then | |
select @create, @insert; | |
end if; | |
prepare stmt from @create; | |
execute stmt; | |
deallocate prepare stmt; | |
prepare stmt from @insert; | |
execute stmt; | |
deallocate prepare stmt; | |
if v_return_result != 0 then | |
set @sql := concat("select * from ", v_table); | |
prepare stmt from @sql; | |
execute stmt; | |
deallocate prepare stmt; | |
end if; | |
if v_drop_table != 0 then | |
set @sql := concat('drop table ', v_table); | |
prepare stmt from @sql; | |
execute stmt; | |
deallocate prepare stmt; | |
end if; | |
end;; | |
delimiter ; | |
--------------------------------------------------------- old | |
#ifndef HAVE_REWRITE | |
#include <algorithm> | |
#include <functional> | |
#include <cctype> | |
#include <locale> | |
#include <string> | |
#include <vector> | |
namespace REWRITE { | |
static inline std::string <rim(std::string &s) { | |
s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun<int, int>(std::isspace)))); | |
return s; | |
} | |
static inline std::string &rtrim(std::string &s) { | |
s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun<int, int>(std::isspace))).base(), s.end()); | |
return s; | |
} | |
// trim from both ends | |
static inline std::string &trim(std::string &s) { | |
return ltrim(rtrim(s)); | |
} | |
/* This is the main entry class for TroySQL/MariaDB/MySQL | |
* The constructor takes | |
* char * - the SQL to rewrite | |
* char * - schema to execute it in | |
* | |
* */ | |
/* This is an example of a complex rewrite plugin for TroySQL. | |
* It utilizes gearman to execute SQL statements in | |
* parallel. | |
* | |
* It requires the included gm_daemon, which is a gearman daemon | |
* plugin. gm_daemon will spawn workers to | |
* run SQL statements through. It requires that Shard-Query | |
* be setup, that the following TroySQL variables are set: | |
* rewrite_filters="olap" | |
* filter_params="olap=/path/to/shard-query.ini" | |
*/ | |
class OLAP_REWRITE { | |
public: | |
char *sql; /* SQL string to execute */ | |
char *schema; /* Schema in which to execute the SQL */ | |
std::vector<char *> table_list; | |
REWRITE_RESULT result; /* A class in which the results will be stored */ | |
OLAP_REWRITE(char *sql, char *schema) { | |
::sql = strdup(sql); | |
::schema = strdup(schema); | |
result = new REWRITE_RESULT; | |
} | |
/* This is the procedure to override to implement your plugin | |
* logic. the rest of the class can be left as-is unless you | |
* plan to change how plugins work. | |
*/ | |
int execute() { | |
/* call shard-query with gearman client, get response | |
* as JSON object: | |
* array [ sql="SQL to execute", tables="CSV list of tables to drop" ] | |
* | |
* This procedure returns: | |
/* return -1 if processing failed | |
* otherwise fill out the result object | |
*/ | |
/*TODO: phpembed code here */ | |
/* construct the table list */ | |
return 0; | |
} | |
REWRITE_RESULT get_result() { | |
return result; | |
} | |
~OLAP_REWRITE() { | |
delete result; | |
if(sql != NULL) free(sql); | |
if(schema != NULL) free(schema); | |
} | |
}; | |
struct REWRITE_RESULT { | |
char *resultset_sql=NULL; /* SQL to run to return final resultset */ | |
std::vector<char *> table_names; /* Tables to DROP after running the final SQL statement */ | |
long table_count=0; /* Count of tables to drop */ | |
char* drop_schema; /* Schema name in which tables to drop is located */ | |
~REWRITE_RESULT() { | |
if(resultset_sql != NULL) assert(!free(drop_schema)); | |
if(drop_schema != NULL) assert(!free(drop_schema)); | |
} | |
}; | |
class REWRITER { | |
bool drop_table(char *schema, char *table_name) { | |
/* TODO: Use MariaDB server drop table code */ | |
} | |
private: | |
char *filters; | |
char *sql; | |
char *schema; | |
public: | |
REWRITER(char *sql, char *schema, char *filter_list) { | |
filters = filter_list; | |
::sql = sql; | |
::schema = schema; | |
} | |
~REWRITER() { | |
/* drop all tables here */ | |
} | |
REWRITE_RESULT execute() { | |
int filter_count = 0; | |
REWRITE_RESULT out; | |
out.schema = schema; | |
/* the stringstream filter is used to parse the delimited string */ | |
std::istringstream ss(std::string(filters)); | |
while(std::getline(ss, filter, ',')) { | |
/* remove begin/end whitespace and lowercase the string*/ | |
filter = REWRITE::trim(&filter); | |
std::transform(filter.begin(), filter.end(), filter.begin(), ::tolower); | |
/* only OLAP is currently supported */ | |
switch(filter) { | |
case "none": | |
out.sql = sql; | |
out.table_count = 0; | |
case "olap": | |
rewriter = new OLAP_REWRITE(sql, schema); | |
int res = rewriter->execute(); | |
/* only utilize the filter if it executed properly */ | |
if(res != -1) { | |
sql = rewriter->result.sql(); // the next filter gets the rewritten SQL | |
table_lists.at(filter_count) = rewriter->result->get_table_list(); | |
out.table_names.push_back(table_lists.at(filter_count)); | |
out.table_count += table_lists | |
} | |
break; | |
/* This should do something better probably, but there will be a try/catch | |
* where this code is called from to handle this and return a meaningful | |
* error message. | |
*/ | |
default: | |
assert(false); | |
break; | |
} | |
++filter_count; | |
} | |
/* if a rewrite at any step fails for some reason, the original | |
* SQL will be executed or sent on to next filter. | |
* | |
*/ | |
out.resultset_sql = sql; | |
return out; | |
} | |
}; | |
}; | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment