Skip to content

Instantly share code, notes, and snippets.

@sax
Last active February 16, 2017 11:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sax/f7e59f6d677108aac010 to your computer and use it in GitHub Desktop.
Save sax/f7e59f6d677108aac010 to your computer and use it in GitHub Desktop.
Dump and reload data into PostgreSQL with massive concurrency
# interpolated load tables step
find . -type f -print0 | xargs -0 -n1 -P50 bash -c "psql -U <postgres_user> <mydatabase> < \$0"
#!/bin/bash
set -e
readonly PROGNAME=$(basename $0)
readonly ARGS="$@"
readonly PID=$$
readonly DEFAULT_SCHEMA="public"
readonly DEFAULT_USER="postgres"
readonly DEFAULT_EXPORT_DIR="/tmp"
readonly PARALLELISM=50
readonly BATCH_SIZE=500000
export PGOPTIONS="--statement-timeout=0"
usage() {
echo
exit 0
}
process_args() {
local arg=$1
for arg; do
local delim=""
case "$arg" in
--database) args="${args}-d ";;
--schema) args="${args}-s ";;
--export_dir) args="${args}-e ";;
--user) args="${args}-U ";;
--dump) args="${args}-D ";;
--dump-schema) args="${args}-S ";;
--load) args="${args}-L ";;
--help) args="${args}-h ";;
--verbose) args="${args}-v ";;
*) [[ "${arg:0:1}" == "-" ]] || delim="\""
args="${args}${delim}${arg}${delim} ";;
esac
done
eval set -- $args
while getopts "d:e:s:U:DLShv" OPTION
do
case $OPTION in
v)
set -x
;;
h)
usage
;;
D)
readonly DUMP=true
;;
L)
readonly LOAD=true
;;
S)
local dump_schema=true
;;
d)
local database=${OPTARG%/}
;;
e)
local export_dir=${OPTARG%/}
;;
s)
local schema=${OPTARG%/}
;;
U)
local user=${OPTARG%/}
;;
esac
done
readonly DATABASE=${database}
readonly DB_USER=${user:-$DEFAULT_USER}
readonly EXPORT_DIR=${export_dir:-$DEFAULT_EXPORT_DIR}
readonly SCHEMA=${schema:-$DEFAULT_SCHEMA}
if [[ ${DUMP} || ${dump_schema} ]]; then
readonly DUMP_SCHEMA=true
fi
}
# Writes out two files. The first includes only the table structure of the schema. The second is
# only the indices associated with those tables.
#
# $SCHEMA.schema.sql
# $SCHEMA.indicies.sql
#
function dump_schema() {
pg_dump -U ${DB_USER} -d ${DATABASE} -n ${SCHEMA} -s --section=pre-data > ${EXPORT_DIR}/${SCHEMA}.schema.sql
pg_dump -U ${DB_USER} -d ${DATABASE} -n ${SCHEMA} -s --section=post-data > ${EXPORT_DIR}/${SCHEMA}.indices.sql
}
# private
function list_tables() {
echo $(psql -qAt -U ${DB_USER} -c "\d" ${DATABASE} | grep table | cut -d'|' -f2)
}
# private
function list_sequences() {
echo $(psql -qAt -U ${DB_USER} -c "\ds ${SCHEMA}.*" ${DATABASE} | cut -d'|' -f2)
}
# private
function table_info_filename() {
echo "${EXPORT_DIR}/table_info"
}
# private
function table_sequence_filename() {
echo "${EXPORT_DIR}/table_sequences"
}
# For each table in the database, write out data into a file for us to look up later
# when actually exporting the data. This data is used to specify batches of records to
# export.
function dump_table_information() {
local tables=$(list_tables)
local table_info_file=$(table_info_filename)
echo -n '' > ${table_info_file}
for table in ${tables}; do
local max_id=$(psql -qAt -U ${DB_USER} -c "select max(id) from ${SCHEMA}.${table}" ${DATABASE})
local batch_count=$(expr $max_id / ${BATCH_SIZE})
for i in $(seq 0 ${batch_count}); do
local min=$(((${i} * ${BATCH_SIZE}) + 1))
local max=$(((${i} + 1) * ${BATCH_SIZE}))
echo -n "${table} ${min} ${max} " >> ${table_info_file}
done
done
}
# Deprecated, but can be used to look up the sequence attached to an :id field
# on a table.
function sequence_for_table() {
local table=$1
echo $(psql -qAt -U ${DB_USER} -c "SELECT column_default from information_schema.columns where table_name='${table}' and column_name = 'id' and table_schema = '${SCHEMA}'" ${DATABASE} | cut -d"'" -f2)
}
# For each sequence in the target schema, write out its next value.
# Call nextval() on the sequence to see what value we should set when loading it back
# into the database.
function dump_sequence_information() {
local sequences=$(list_sequences)
local table_sequence_file=$(table_sequence_filename)
echo -n '' > ${table_sequence_file}
for sequence in ${sequences}; do
local next_sequence=$(psql -qAt -U ${DB_USER} -c "select nextval('${SCHEMA}.${sequence}')" ${DATABASE})
local new_count=$((next_sequence + 100))
echo "${sequence} ${next_sequence}" >> ${table_sequence_file}
done
}
# For each batch of each table, write out a CSV file of records.
#
# Note that this currently only dumps tables that have :id columns.
#
function dump_tables() {
local table_info_file=$(table_info_filename)
cat ${table_info_file} | xargs -d ' ' -n3 -P${PARALLELISM} bash -c "psql -U ${DB_USER} --set=statement_timeout=0 -c \"COPY (select * from ${SCHEMA}.\$0 where id >= \$1 and id <= \$2) TO '${EXPORT_DIR}/\$0.\$1.\$2.csv'\" ${DATABASE}"
}
# For every data file exported from a previous --dump, load the data into
# the new schema.
#
function load_tables() {
find ${EXPORT_DIR} -type f -name '*.csv' -size +0c -print0 | xargs -0 -n1 -P${PARALLELISM} bash -c "psql -U ${DB_USER} --set=statement_timeout=0 -c \"COPY ${SCHEMA}.\$(basename \$0 | cut -d'.' -f1) FROM '\$0' \" ${DATABASE}"
}
# Look up the sequences exported in a previous --dump. Set the next sequence value to
# what we saw before +1.
function reset_sequences() {
local table_sequence_file=$(table_sequence_filename)
while read sequence count; do
if [[ ! -z "${count}" ]]; then
local new_count=$((count + 1))
psql -U ${DB_USER} --set=statement_timeout=0 -c "ALTER SEQUENCE ${SCHEMA}.${sequence} restart ${new_count}" ${DATABASE}
fi
done < $table_sequence_file
}
function main() {
process_args ${ARGS}
if [ ${DUMP_SCHEMA} ]; then
dump_schema
fi
if [ ${DUMP} ]; then
dump_table_information
dump_sequence_information
dump_tables
fi
if [ ${LOAD} ]; then
load_tables
reset_sequences
fi
}
main
# find . -type f -print0 | xargs -0 -n1 -P50 bash -c "psql -U <postgres_user> <mydatabase> < \$0"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment