Skip to content

Instantly share code, notes, and snippets.

@greenlion
Last active October 10, 2015 02:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save greenlion/5c50f5bfacde484d903b to your computer and use it in GitHub Desktop.
Save greenlion/5c50f5bfacde484d903b to your computer and use it in GitHub Desktop.
MariaDB-TroySQL OLAP plugin interface
save_client_capabilities = thd->client_capabilities;
thd->client_capabilities |= CLIENT_MULTI_QUERIES;
save_vio = thd->net.vio;
thd->net.vio = 0;
for(table in tables) {
dispatch_command(COM_QUERY, thd, buf, len);
}
thd->client_capabilities = save_client_capabilties;
thd->net.vio = save_vio;
-- STORED PROC HELPER
delimiter ;;
create database if not exists shard_query;;
use shard_query;;
drop procedure if exists sq_helper;;
drop procedure if exists sq_throw;;
create procedure sq_throw(v_err text)
begin
signal sqlstate '45000' set
class_origin = 'Fastbit_UDF',
subclass_origin = 'Fastbit_UDF',
mysql_errno = 45000,
message_text = v_err;
end;;
create procedure sq_helper(v_sql longtext, v_remote_schema varchar(64), v_into_schema varchar(64), v_into_table varchar(64), v_return_result tinyint, v_drop_table tinyint)
proc:begin
-- query to send to SQ formatted as JSON
declare v_args longtext;
-- JSON returned from SQ
declare v_json longtext;
-- where to put the rows after decoding the JSON
declare v_table text;
-- JSON string extracted from v_json which represents list of columns
-- in the resultset from SQ
declare v_columns text;
-- the VALUES clause for the insert into v_table
declare v_values text;
-- used for finding parts of the JSON
declare v_pos int;
declare v_pos2 int;
-- use the SQL default schema if no schema provided
-- this is the only parameter that may be NULL and
-- since this makes it not null, it will pass the
-- check below
if v_remote_schema IS NULL
then
set v_remote_schema := '';
end if;
-- check args
set v_table := concat('`',v_into_schema,'`.`',v_into_table,'`');
if v_sql is null or v_table is null or v_remote_schema is null or v_into_schema is null or v_into_table is null or v_return_result is null or v_drop_table is null
then
call sq_throw('All arguments are NOT NULL');
end if;
set v_args := concat('{"sql":"', v_sql, '","schema_name":"',v_remote_schema,'"}');
set v_json := gman_do('shard_query_worker', v_args);
-- Extract the JSON arrays from the document
set v_pos := locate("[", v_json);
set v_pos2 := locate("]", v_json);
set v_columns := substr(v_json, v_pos+1, v_pos2-v_pos-1);
-- clean up column list
set v_columns := replace(v_columns, '\\"', '"');
-- extract rows
set v_pos := locate('[[',v_json);
set v_pos2 := locate(']]',v_json, v_pos+2);
-- the column json has a very simple structure. The easiest thing to do is simply
-- get rid of the extra JSON with REPLACE() and generate the CREATE TABLE with
-- the column names that remain
set @create := concat('create table temporary',v_table, '(', replace(replace(replace(v_columns, '{"type":250,"name":"',''),'"}',''),',',' text,'),' text)');
-- generate the INSERT statement
set @insert:= concat('insert into ', v_table, ' values ', replace( replace( replace( replace( replace( substr(v_json,v_pos+1,v_pos2-v_pos),'[\\','[') ,'\\",\\"','","') ,'\\"]','"]') ,'[' ,'(') ,']',')'));
if(@sqdebug is not null) then
select @create, @insert;
end if;
prepare stmt from @create;
execute stmt;
deallocate prepare stmt;
prepare stmt from @insert;
execute stmt;
deallocate prepare stmt;
if v_return_result != 0 then
set @sql := concat("select * from ", v_table);
prepare stmt from @sql;
execute stmt;
deallocate prepare stmt;
end if;
if v_drop_table != 0 then
set @sql := concat('drop table ', v_table);
prepare stmt from @sql;
execute stmt;
deallocate prepare stmt;
end if;
end;;
delimiter ;
--------------------------------------------------------- old
#ifndef HAVE_REWRITE
#include <algorithm>
#include <functional>
#include <cctype>
#include <locale>
#include <string>
#include <vector>
namespace REWRITE {
static inline std::string &ltrim(std::string &s) {
s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun<int, int>(std::isspace))));
return s;
}
static inline std::string &rtrim(std::string &s) {
s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun<int, int>(std::isspace))).base(), s.end());
return s;
}
// trim from both ends
static inline std::string &trim(std::string &s) {
return ltrim(rtrim(s));
}
/* This is the main entry class for TroySQL/MariaDB/MySQL
* The constructor takes
* char * - the SQL to rewrite
* char * - schema to execute it in
*
* */
/* This is an example of a complex rewrite plugin for TroySQL.
* It utilizes gearman to execute SQL statements in
* parallel.
*
* It requires the included gm_daemon, which is a gearman daemon
* plugin. gm_daemon will spawn workers to
* run SQL statements through. It requires that Shard-Query
* be setup, that the following TroySQL variables are set:
* rewrite_filters="olap"
* filter_params="olap=/path/to/shard-query.ini"
*/
class OLAP_REWRITE {
public:
char *sql; /* SQL string to execute */
char *schema; /* Schema in which to execute the SQL */
std::vector<char *> table_list;
REWRITE_RESULT result; /* A class in which the results will be stored */
OLAP_REWRITE(char *sql, char *schema) {
::sql = strdup(sql);
::schema = strdup(schema);
result = new REWRITE_RESULT;
}
/* This is the procedure to override to implement your plugin
* logic. the rest of the class can be left as-is unless you
* plan to change how plugins work.
*/
int execute() {
/* call shard-query with gearman client, get response
* as JSON object:
* array [ sql="SQL to execute", tables="CSV list of tables to drop" ]
*
* This procedure returns:
/* return -1 if processing failed
* otherwise fill out the result object
*/
/*TODO: phpembed code here */
/* construct the table list */
return 0;
}
REWRITE_RESULT get_result() {
return result;
}
~OLAP_REWRITE() {
delete result;
if(sql != NULL) free(sql);
if(schema != NULL) free(schema);
}
};
struct REWRITE_RESULT {
char *resultset_sql=NULL; /* SQL to run to return final resultset */
std::vector<char *> table_names; /* Tables to DROP after running the final SQL statement */
long table_count=0; /* Count of tables to drop */
char* drop_schema; /* Schema name in which tables to drop is located */
~REWRITE_RESULT() {
if(resultset_sql != NULL) assert(!free(drop_schema));
if(drop_schema != NULL) assert(!free(drop_schema));
}
};
class REWRITER {
bool drop_table(char *schema, char *table_name) {
/* TODO: Use MariaDB server drop table code */
}
private:
char *filters;
char *sql;
char *schema;
public:
REWRITER(char *sql, char *schema, char *filter_list) {
filters = filter_list;
::sql = sql;
::schema = schema;
}
~REWRITER() {
/* drop all tables here */
}
REWRITE_RESULT execute() {
int filter_count = 0;
REWRITE_RESULT out;
out.schema = schema;
/* the stringstream filter is used to parse the delimited string */
std::istringstream ss(std::string(filters));
while(std::getline(ss, filter, ',')) {
/* remove begin/end whitespace and lowercase the string*/
filter = REWRITE::trim(&filter);
std::transform(filter.begin(), filter.end(), filter.begin(), ::tolower);
/* only OLAP is currently supported */
switch(filter) {
case "none":
out.sql = sql;
out.table_count = 0;
case "olap":
rewriter = new OLAP_REWRITE(sql, schema);
int res = rewriter->execute();
/* only utilize the filter if it executed properly */
if(res != -1) {
sql = rewriter->result.sql(); // the next filter gets the rewritten SQL
table_lists.at(filter_count) = rewriter->result->get_table_list();
out.table_names.push_back(table_lists.at(filter_count));
out.table_count += table_lists
}
break;
/* This should do something better probably, but there will be a try/catch
* where this code is called from to handle this and return a meaningful
* error message.
*/
default:
assert(false);
break;
}
++filter_count;
}
/* if a rewrite at any step fails for some reason, the original
* SQL will be executed or sent on to next filter.
*
*/
out.resultset_sql = sql;
return out;
}
};
};
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment