Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Created July 11, 2018 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcocitus/66be49815ed8a71345c6a978679160bf to your computer and use it in GitHub Desktop.
Save marcocitus/66be49815ed8a71345c6a978679160bf to your computer and use it in GitHub Desktop.
Creating a distributed postgres_fdw table
/* add the user mappings from the distributed table to the shards */
CREATE OR REPLACE FUNCTION bind_shards_to_remote_shards(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
foreign_server text;
fdw_options text[];
user_mapping_options text[];
server_options text[];
remote_table_name text;
remote_conninfo text;
nowhere text;
BEGIN
SELECT s.srvname, ftoptions, umoptions, srvoptions INTO foreign_server, fdw_options, user_mapping_options, server_options
FROM pg_user_mappings u JOIN pg_foreign_table t ON (srvid = ftserver) JOIN pg_foreign_server s ON (srvid = s.oid)
WHERE ftrelid = table_name AND usename = current_user;
remote_conninfo := format('host=%s user=%L password=%L sslmode=require',
find_value('host', server_options),
find_value('user', user_mapping_options),
find_value('password', user_mapping_options));
remote_table_name := find_value('table_name', fdw_options);
WITH commands_to_execute AS (
SELECT
placement.nodename,
placement.nodeport,
format($$ALTER FOREIGN TABLE %I OPTIONS (SET table_name %L)$$, shard_name(table_name, local.shardid), remote_table_name||'_'||remote.shardid) AS shard_name_command,
format($$ALTER SERVER %s_%s OPTIONS (SET host %L, SET port %L)$$, foreign_server, local.shardid, remote.nodename, remote.nodeport) AS host_command
FROM
pg_dist_shard_placement placement
JOIN
pg_dist_shard local USING (shardid)
JOIN
dblink(remote_conninfo, format($$SELECT nodename, nodeport, shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = %L::regclass$$, remote_table_name)) AS remote (nodename text, nodeport int, shardid bigint, shardminvalue text, shardmaxvalue text) USING (shardminvalue)
WHERE logicalrelid = table_name
)
SELECT master_run_on_worker(array_agg(nodename), array_agg(nodeport), array_agg(shard_name_command), true),
master_run_on_worker(array_agg(nodename), array_agg(nodeport), array_agg(host_command), true) INTO nowhere
FROM commands_to_execute;
END;
$function$;
/* add the user mappings from the distributed table to the shards */
CREATE OR REPLACE FUNCTION add_user_mappings_to_shards(table_name regclass)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
r record;
BEGIN
FOR r IN
SELECT usename AS username, srvname AS foreign_server, umoptions AS user_mapping_options
FROM pg_user_mappings JOIN pg_foreign_table ON (srvid = ftserver)
WHERE ftrelid = table_name
LOOP
PERFORM master_run_on_worker(
array_agg(nodename),
array_agg(nodeport),
array_agg(create_user_mapping_command(r.username, r.foreign_server, shardid, r.user_mapping_options)),
true)
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = table_name;
END LOOP;
END;
$function$;
/* extract the key and the value form a key=value string */
CREATE OR REPLACE FUNCTION create_user_mapping_command(username name, foreign_server name, shardid bigint, user_mapping_options text[])
RETURNS text
LANGUAGE sql
AS $function$
SELECT format($$CREATE USER MAPPING FOR %I SERVER %s_%s OPTIONS (%s)$$, username, foreign_server, shardid, array_to_string(array_agg(convert_key_value(u)), ', '))
FROM unnest(user_mapping_options) u;
$function$;
/* convert a key=value string to a key 'value' string */
CREATE OR REPLACE FUNCTION convert_key_value(key_value text)
RETURNS text
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN format('%s %L', key, value) FROM extract_key_value(key_value);
END;
$function$;
/* extract the key and value from a key=value string */
CREATE OR REPLACE FUNCTION extract_key_value(key_value text, OUT key text, OUT value text)
RETURNS record
LANGUAGE plpgsql
AS $function$
DECLARE
separator_index int := position('=' IN key_value);
BEGIN
key := substring(key_value for separator_index-1);
value := substring(key_value from separator_index + 1);
END;
$function$;
/* find the value for a given key in an array of key=value paris */
CREATE OR REPLACE FUNCTION find_value(needle text, haystack text[])
RETURNS text
LANGUAGE sql
AS $function$
SELECT value
FROM (SELECT (extract_key_value(u)).* FROM unnest(haystack) u) options
WHERE key = needle
$function$;
-- Create a postgres_fdw server for the remote Citus formation
CREATE SERVER us_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'c.f2pvks4wstvayzgtldzykinxxvy.db.citusdata.com', port '5432', sslmode 'require');
CREATE USER MAPPING FOR citus
SERVER us_server
OPTIONS (user 'citus', password 'abcdefg');
-- Match remote distributed table
CREATE FOREIGN TABLE page_views_us (
site_id int,
path text,
client_ip inet,
view_time timestamptz,
view_id bigint
)
SERVER us_server OPTIONS (table_name 'page_views');
SELECT create_distributed_table('page_views_us', 'site_id');
SELECT add_user_mappings_to_shards('page_views_us'), bind_shards_to_remote_shards('page_views_us');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment