Skip to content

Instantly share code, notes, and snippets.

@bredman
Created November 21, 2014 21:49
Show Gist options
  • Save bredman/6d157b59649d8c7c29eb to your computer and use it in GitHub Desktop.
Save bredman/6d157b59649d8c7c29eb to your computer and use it in GitHub Desktop.
#!/bin/bash
set -o nounset
set -o errexit
NUM_PARALLEL=4
TMP_DIR=`mktemp -d`
COPY_SCRIPT=/home/ec2-user/59407c287d8e3e074869/pg_shard_copy.sql
echo "splitting $1 to $TMP_DIR..."
split -n l/$NUM_PARALLEL $1 $TMP_DIR/part-
echo "loading data..."
# NOTE: if a psql command fails user is responsible for repairs
ls $TMP_DIR | xargs -I{} -n 1 -P $NUM_PARALLEL psql -X -f $COPY_SCRIPT -vfilename=$TMP_DIR/{} $2
rm -rf $TMP_DIR
echo "DONE"
/*
* pg_shard_copy - Build and uses a temporary adapter layer to permit COPY
* operations to a distributed table created by pg_shard.
*
* usage: psql -X -fpg_shard_copy.sql -vfilename=<filename> \
* <dbname>
*
* In order use this file you first need to define the following trigger in
* your DB:
*
* CREATE OR REPLACE FUNCTION copy_to_insert_ataddv2() RETURNS trigger AS $copy_to_insert_ataddv2$
* BEGIN
* EXECUTE 'INSERT INTO audience_taxonomy_attribute_details_daily_v2 VALUES ($1, $2, ' ||
* '$3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)' USING
* NEW.taxonomy_attribute_id, NEW.audience_definition_id, NEW.report_date,
* NEW.third_party_adserver_id, NEW.campaign_id, NEW.impressions_total,
* NEW.uu_impression_total, NEW.click_engagement_count,
* NEW.ak_uu_click_engagement_count, NEW.adv_impression_attributed_conversions,
* NEW.adv_click_attributed_conversions, NEW.raw_adv_impression_attributed_conversions,
* NEW.raw_adv_click_attributed_conversions, NEW.adv_uu_attributed_conversions_total,
* NEW.adv_impression_attributed_conversions_by_type,
* NEW.adv_engagement_attributed_conversions_by_type;
* PERFORM nextval('row_load_counter');
* RETURN NULL;
* END;
* $copy_to_insert_ataddv2$ LANGUAGE plpgsql;
*/
\set QUIET on
\set ON_ERROR_ROLLBACK off
\pset format unaligned
\pset tuples_only on
\set ON_ERROR_STOP on
/*
* Use a session-bound counter to keep track of the number of rows inserted: we
* can't roll back so we need to tell the user how many rows were inserted. Due
* to the trigger implementation, the COPY will report zero rows, so we count
* them manually for a better experience.
*/
CREATE TEMPORARY SEQUENCE row_load_counter CACHE 100000;
/* Create a temporary "shell" table to COPY to... */
CREATE TEMPORARY TABLE copy_to_ataddv2 ( LIKE audience_taxonomy_attribute_details_daily_v2 );
/* Finally, install it on the shell table. */
CREATE TRIGGER copy_to_insert_ataddv2 BEFORE INSERT ON copy_to_ataddv2 FOR EACH ROW EXECUTE PROCEDURE copy_to_insert_ataddv2();
/* We using this to fail fast if the trigger function doesn't exist, otherwise
* on error we want to make sure to continue to print number of rows loaded.
*/
\set ON_ERROR_STOP off
/* Execute the copy */
COPY copy_to_ataddv2 FROM :'filename';
/* Print out number of rows actually processed */
SELECT :'filename' || ' ' || currval('row_load_counter');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment