Created
July 11, 2018 14:36
-
-
Save marcocitus/66be49815ed8a71345c6a978679160bf to your computer and use it in GitHub Desktop.
Creating a distributed postgres_fdw table
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* add the user mappings from the distributed table to the shards */ | |
CREATE OR REPLACE FUNCTION bind_shards_to_remote_shards(table_name regclass) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
foreign_server text; | |
fdw_options text[]; | |
user_mapping_options text[]; | |
server_options text[]; | |
remote_table_name text; | |
remote_conninfo text; | |
nowhere text; | |
BEGIN | |
SELECT s.srvname, ftoptions, umoptions, srvoptions INTO foreign_server, fdw_options, user_mapping_options, server_options | |
FROM pg_user_mappings u JOIN pg_foreign_table t ON (srvid = ftserver) JOIN pg_foreign_server s ON (srvid = s.oid) | |
WHERE ftrelid = table_name AND usename = current_user; | |
remote_conninfo := format('host=%s user=%L password=%L sslmode=require', | |
find_value('host', server_options), | |
find_value('user', user_mapping_options), | |
find_value('password', user_mapping_options)); | |
remote_table_name := find_value('table_name', fdw_options); | |
WITH commands_to_execute AS ( | |
SELECT | |
placement.nodename, | |
placement.nodeport, | |
format($$ALTER FOREIGN TABLE %I OPTIONS (SET table_name %L)$$, shard_name(table_name, local.shardid), remote_table_name||'_'||remote.shardid) AS shard_name_command, | |
format($$ALTER SERVER %s_%s OPTIONS (SET host %L, SET port %L)$$, foreign_server, local.shardid, remote.nodename, remote.nodeport) AS host_command | |
FROM | |
pg_dist_shard_placement placement | |
JOIN | |
pg_dist_shard local USING (shardid) | |
JOIN | |
dblink(remote_conninfo, format($$SELECT nodename, nodeport, shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = %L::regclass$$, remote_table_name)) AS remote (nodename text, nodeport int, shardid bigint, shardminvalue text, shardmaxvalue text) USING (shardminvalue) | |
WHERE logicalrelid = table_name | |
) | |
SELECT master_run_on_worker(array_agg(nodename), array_agg(nodeport), array_agg(shard_name_command), true), | |
master_run_on_worker(array_agg(nodename), array_agg(nodeport), array_agg(host_command), true) INTO nowhere | |
FROM commands_to_execute; | |
END; | |
$function$; | |
/* add the user mappings from the distributed table to the shards */ | |
CREATE OR REPLACE FUNCTION add_user_mappings_to_shards(table_name regclass) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
r record; | |
BEGIN | |
FOR r IN | |
SELECT usename AS username, srvname AS foreign_server, umoptions AS user_mapping_options | |
FROM pg_user_mappings JOIN pg_foreign_table ON (srvid = ftserver) | |
WHERE ftrelid = table_name | |
LOOP | |
PERFORM master_run_on_worker( | |
array_agg(nodename), | |
array_agg(nodeport), | |
array_agg(create_user_mapping_command(r.username, r.foreign_server, shardid, r.user_mapping_options)), | |
true) | |
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) | |
WHERE logicalrelid = table_name; | |
END LOOP; | |
END; | |
$function$; | |
/* extract the key and the value form a key=value string */ | |
CREATE OR REPLACE FUNCTION create_user_mapping_command(username name, foreign_server name, shardid bigint, user_mapping_options text[]) | |
RETURNS text | |
LANGUAGE sql | |
AS $function$ | |
SELECT format($$CREATE USER MAPPING FOR %I SERVER %s_%s OPTIONS (%s)$$, username, foreign_server, shardid, array_to_string(array_agg(convert_key_value(u)), ', ')) | |
FROM unnest(user_mapping_options) u; | |
$function$; | |
/* convert a key=value string to a key 'value' string */ | |
CREATE OR REPLACE FUNCTION convert_key_value(key_value text) | |
RETURNS text | |
LANGUAGE plpgsql | |
AS $function$ | |
BEGIN | |
RETURN format('%s %L', key, value) FROM extract_key_value(key_value); | |
END; | |
$function$; | |
/* extract the key and value from a key=value string */ | |
CREATE OR REPLACE FUNCTION extract_key_value(key_value text, OUT key text, OUT value text) | |
RETURNS record | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
separator_index int := position('=' IN key_value); | |
BEGIN | |
key := substring(key_value for separator_index-1); | |
value := substring(key_value from separator_index + 1); | |
END; | |
$function$; | |
/* find the value for a given key in an array of key=value paris */ | |
CREATE OR REPLACE FUNCTION find_value(needle text, haystack text[]) | |
RETURNS text | |
LANGUAGE sql | |
AS $function$ | |
SELECT value | |
FROM (SELECT (extract_key_value(u)).* FROM unnest(haystack) u) options | |
WHERE key = needle | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Create a postgres_fdw server for the remote Citus formation | |
CREATE SERVER us_server | |
FOREIGN DATA WRAPPER postgres_fdw | |
OPTIONS (host 'c.f2pvks4wstvayzgtldzykinxxvy.db.citusdata.com', port '5432', sslmode 'require'); | |
CREATE USER MAPPING FOR citus | |
SERVER us_server | |
OPTIONS (user 'citus', password 'abcdefg'); | |
-- Match remote distributed table | |
CREATE FOREIGN TABLE page_views_us ( | |
site_id int, | |
path text, | |
client_ip inet, | |
view_time timestamptz, | |
view_id bigint | |
) | |
SERVER us_server OPTIONS (table_name 'page_views'); | |
SELECT create_distributed_table('page_views_us', 'site_id'); | |
SELECT add_user_mappings_to_shards('page_views_us'), bind_shards_to_remote_shards('page_views_us'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment