|
require 'json' |
|
require 'timeout' |
|
require 'pg' |
|
require 'optparse' |
|
require 'highline/import' |
|
|
|
class Migrate2018 |
|
|
|
State = Struct.new(:year, :month, :year_month, :start_time, :current_time, :options) |
|
|
|
def initialize(year, month, options) |
|
@init_state = State.new(year, month, year_month(year, month), start_time(year, month), Time.now, options) |
|
end |
|
|
|
def connect_to_postgres |
|
puts "Connecting to Postgres" |
|
# This assume running as the db user (chef-pgsql, opscode-pgsql or postgres) on the db machine |
|
reporting_password=`/opt/opscode-reporting/embedded/bin/veil-env-helper --secret opscode-reporting.sql_password printenv CHEF_SECRET_OPSCODE-REPORTING.SQL_PASSWORD` |
|
connect_str="dbname=opscode_reporting host=localhost user=opscode_reporting password=#{reporting_password}" |
|
@conn = PG.connect( connect_str ) |
|
puts "Connected to Postgres" |
|
end |
|
|
|
def year_month(year, month) |
|
Time.utc(year, month).strftime("%Y_%m") |
|
end |
|
|
|
def start_time(year, month) |
|
Time.utc(year, month, 1, 0, 0) |
|
end |
|
|
|
def end_time |
|
if @init_state.month == 12 |
|
Time.utc(@init_state.year + 1, 1, 1, 0, 0) |
|
else |
|
Time.utc(@init_state.year, @init_state.month + 1, 1, 0, 0) |
|
end |
|
end |
|
|
|
def year_hyphen_month |
|
Time.utc(@init_state.year, @init_state.month).strftime("%Y-%m") |
|
end |
|
|
|
def end_year_hyphen_month |
|
if @init_state.month == 12 |
|
Time.utc(@init_state.year + 1, 1, 1, 0, 0).strftime("%Y-%m") |
|
else |
|
Time.utc(@init_state.year, @init_state.month + 1, 1, 0, 0).strftime("%Y-%m") |
|
end |
|
end |
|
|
|
def format_time(time) |
|
time.strftime("%Y-%m-%d %T%z") |
|
end |
|
|
|
def create_node_run_replacement_table |
|
@conn.exec("CREATE TABLE IF NOT EXISTS node_run_#{@init_state.year_month}_replacement( |
|
run_id uuid PRIMARY KEY, |
|
org_id CHAR(32) NOT NULL, |
|
node_id CHAR(32) NOT NULL, |
|
node_name text NOT NULL, |
|
|
|
status VARCHAR(16) NOT NULL, |
|
|
|
start_time timestamp with time zone NOT NULL, |
|
end_time timestamp with time zone NOT NULL, |
|
duration INTERVAL NOT NULL, |
|
duration_id integer REFERENCES duration_buckets(id) ON UPDATE RESTRICT ON DELETE RESTRICT NOT NULL , -- set as foreign key in the partition tables |
|
|
|
event_data text, |
|
run_list text DEFAULT '' NOT NULL, |
|
updated_res_count integer DEFAULT 0 NOT NULL, |
|
total_res_count integer DEFAULT 0 NOT NULL |
|
);") |
|
end |
|
|
|
def create_node_run_detail_replacement_table |
|
@conn.exec("CREATE TABLE IF NOT EXISTS node_run_detail_#{@init_state.year_month}_replacement ( |
|
run_id uuid NOT NULL, |
|
seq integer NOT NULL, |
|
duration integer NOT NULL, |
|
res_id TEXT NOT NULL, |
|
res_type TEXT NOT NULL, |
|
res_name TEXT NOT NULL, |
|
res_result TEXT, |
|
res_initial_state text, |
|
res_final_state text, |
|
start_time timestamp with time zone NOT NULL, |
|
delta text, |
|
cookbook_name text DEFAULT '' NOT NULL, |
|
cookbook_ver TEXT DEFAULT ''::TEXT NOT NULL, |
|
PRIMARY KEY(run_id, seq), |
|
CHECK (start_time >= TIMESTAMPTZ '#{year_hyphen_month}-01 00:00:00-00' and start_time < TIMESTAMPTZ '#{end_year_hyphen_month}-01 00:00:00-00') |
|
);") |
|
end |
|
|
|
def prepare_and_execute(name, query) |
|
@conn.prepare(name, query) |
|
@conn.exec_prepared(name, []) |
|
end |
|
|
|
def create_node_run_triggers_and_indices |
|
puts "Add check constraint" |
|
node_run_time_check = "ALTER TABLE node_run_#{@init_state.year_month}_replacement ADD CHECK (start_time >= TIMESTAMPTZ '#{year_hyphen_month}-01 00:00:00-00' and start_time < TIMESTAMPTZ '#{end_year_hyphen_month}-01 00:00:00-00');" |
|
puts node_run_time_check |
|
prepare_and_execute("node_run_time_check", node_run_time_check) |
|
|
|
puts "create triggers" |
|
node_run_it = "CREATE TRIGGER node_run_it_t_#{@init_state.year_month}_replacement AFTER INSERT ON node_run_#{@init_state.year_month}_replacement FOR EACH ROW EXECUTE PROCEDURE node_run_it();" |
|
puts node_run_it |
|
prepare_and_execute("node_run_it", node_run_it) |
|
|
|
node_run_dt = "CREATE TRIGGER node_run_dt_t_#{@init_state.year_month}_replacement AFTER DELETE on node_run_#{@init_state.year_month}_replacement FOR EACH ROW EXECUTE PROCEDURE node_run_dt();" |
|
puts node_run_dt |
|
prepare_and_execute("node_run_dt", node_run_dt) |
|
|
|
node_run_ut = "CREATE TRIGGER node_run_ut_t_#{@init_state.year_month}_replacement AFTER UPDATE on node_run_#{@init_state.year_month}_replacement FOR EACH ROW EXECUTE PROCEDURE node_run_ut();" |
|
puts node_run_ut |
|
prepare_and_execute("node_run_ut", node_run_ut) |
|
|
|
puts "build indexes" |
|
node_run_start_time_idx = "CREATE INDEX node_run_#{@init_state.year_month}_replacement_start_time_idx ON node_run_#{@init_state.year_month}_replacement(start_time);" |
|
puts node_run_start_time_idx |
|
prepare_and_execute("node_run_start_time_idx", node_run_start_time_idx) |
|
|
|
node_run_org_id_node_id_idx = "CREATE INDEX node_run_#{@init_state.year_month}_replacement_org_id_node_id_idx ON node_run_#{@init_state.year_month}_replacement(org_id, node_id);" |
|
puts node_run_org_id_node_id_idx |
|
prepare_and_execute("node_run_org_id_node_id_idx", node_run_org_id_node_id_idx) |
|
|
|
node_run_start_time_min_idx = "CREATE INDEX node_run_#{@init_state.year_month}_replacement_start_time_minute_idx ON node_run_#{@init_state.year_month}_replacement(date_trunc('minute', start_time AT TIME ZONE 'UTC'));" |
|
puts node_run_start_time_min_idx |
|
prepare_and_execute("node_run_start_time_min_idx", node_run_start_time_min_idx) |
|
|
|
node_run_status_idx = "CREATE INDEX node_run_#{@init_state.year_month}_replacement_status_idx ON node_run_#{@init_state.year_month}_replacement(status);" |
|
puts node_run_status_idx |
|
prepare_and_execute("node_run_status_idx", node_run_status_idx) |
|
|
|
node_run_org_id_node_name_idx = "CREATE INDEX node_run_#{@init_state.year_month}_replacement_org_id_node_name_idx ON node_run_#{@init_state.year_month}_replacement(org_id, node_name);" |
|
puts node_run_org_id_node_name_idx |
|
prepare_and_execute("node_run_org_id_node_name_idx", node_run_org_id_node_name_idx) |
|
|
|
node_run_org_id_status_idx = "CREATE INDEX node_run_#{@init_state.year_month}_replacement_org_id_status_idx on node_run_#{@init_state.year_month}_replacement(org_id, status);" |
|
puts node_run_org_id_status_idx |
|
prepare_and_execute("node_run_org_id_status_idx", node_run_org_id_status_idx) |
|
|
|
node_run_org_id_start_time_idx = "CREATE INDEX node_run_#{@init_state.year_month}_replacement_org_id_start_time_idx on node_run_#{@init_state.year_month}_replacement(org_id, start_time);" |
|
puts node_run_org_id_start_time_idx |
|
prepare_and_execute("node_run_org_id_start_time_idx", node_run_org_id_start_time_idx) |
|
|
|
puts "vacuum analyze replacement table" |
|
node_run_vacuum_analyze = "VACUUM ANALYZE node_run_#{@init_state.year_month}_replacement;" |
|
puts node_run_vacuum_analyze |
|
prepare_and_execute("node_run_vacuum_analyze", node_run_vacuum_analyze) |
|
end |
|
|
|
def create_node_run_detail_triggers_and_indices |
|
puts "Add node_run_detail indices" |
|
node_run_detail_idx = "CREATE INDEX node_run_detail_#{@init_state.year_month}_replacement_start_time_idx ON node_run_detail_#{@init_state.year_month}_replacement(start_time);" |
|
puts node_run_detail_idx |
|
prepare_and_execute("node_run_detail_idx", node_run_detail_idx) |
|
|
|
puts "Vacuum Analyze" |
|
node_run_detail_vacuum_analyze = "VACUUM ANALYZE node_run_detail_#{@init_state.year_month}_replacement;" |
|
puts node_run_detail_vacuum_analyze |
|
prepare_and_execute("node_run_detail_vacuum_analyze", node_run_detail_vacuum_analyze) |
|
end |
|
|
|
def prepare_statements |
|
@delete_node_run_dups_parent = "DELETE FROM node_run_#{@init_state.year_month}_replacement where exists (SELECT run_id FROM only node_run where start_time >= $1 and start_time < $2); " |
|
@conn.prepare("delete_node_run_duplicate_pk_parent", @delete_node_run_dups_parent) |
|
|
|
@migrate_node_run_sql_parent = "INSERT INTO node_run_#{@init_state.year_month}_replacement SELECT * FROM only node_run where start_time >= $1 and start_time < $2;" |
|
@conn.prepare("migrate_node_run_to_replacement", @migrate_node_run_sql_parent) |
|
|
|
@migrate_node_run_detail_sql_parent = "INSERT INTO node_run_detail_#{@init_state.year_month}_replacement SELECT * FROM only node_run_detail where start_time >= $1 and start_time < $2;" |
|
@conn.prepare("migrate_node_run_detail_to_replacement", @migrate_node_run_detail_sql_parent) |
|
|
|
if @exists_node_run_table |
|
@delete_node_run_dups_partition = "DELETE FROM node_run_#{@init_state.year_month}_replacement where exists (SELECT run_id FROM only node_run_#{@init_state.year_month} where start_time >= $1 and start_time < $2); " |
|
@conn.prepare("delete_node_run_duplicate_pk_partition", @delete_node_run_dups_partition) |
|
|
|
@migrate_node_run_sql_partition = "INSERT INTO node_run_#{@init_state.year_month}_replacement SELECT * FROM only node_run_#{@init_state.year_month} where start_time >= $1 and start_time < $2;" |
|
@conn.prepare("migrate_node_run_partition_to_replacement", @migrate_node_run_sql_partition) |
|
end |
|
|
|
if @exists_node_run_detail_table |
|
@delete_node_run_detail_dups = "DELETE FROM node_run_detail_#{@init_state.year_month}_replacement where exists (SELECT run_id, seq FROM only node_run_detail_#{@init_state.year_month} where start_time >= $1 and start_time < $2); " |
|
@conn.prepare("delete_node_run_detail_duplicate_pk", @delete_node_run_detail_dups) |
|
|
|
@migrate_node_run_detail_sql_partition = "INSERT INTO node_run_detail_#{@init_state.year_month}_replacement SELECT * FROM only node_run_detail_#{@init_state.year_month} where start_time >= $1 and start_time < $2;" |
|
@conn.prepare("migrate_node_run_detail_partition_to_replacement", @migrate_node_run_detail_sql_partition) |
|
end |
|
end |
|
|
|
def migrate_node_run_data |
|
puts "Deleting duplicates with parent in node_run_#{@init_state.year_month}_replacement table from #{format_time(@init_state.start_time)}, #{format_time(end_time)}" |
|
@conn.exec_prepared('delete_node_run_duplicate_pk_parent', [format_time(@init_state.start_time), format_time(end_time)]) |
|
|
|
if @exists_node_run_table |
|
puts "Deleting duplicates with partition in node_run_#{@init_state.year_month}_replacement table from #{format_time(@init_state.start_time)}, #{format_time(end_time)}" |
|
@conn.exec_prepared('delete_node_run_duplicate_pk_partition', [format_time(@init_state.start_time), format_time(end_time)]) |
|
end |
|
|
|
node_run_start_time = @init_state.start_time |
|
while (node_run_start_time < end_time) do |
|
hour_end = node_run_start_time + 3600 |
|
start_time_str = format_time(node_run_start_time) |
|
hour_end_str = format_time(hour_end) |
|
current_time_str = format_time(Time.new) |
|
|
|
puts "Migrating node_run from parent to #{@init_state.year_month} hour starting : #{start_time_str} Current operational time: #{current_time_str}" |
|
@conn.exec_prepared('migrate_node_run_to_replacement', [start_time_str, hour_end_str]) |
|
|
|
if @exists_node_run_table |
|
puts "Migrating node_run from partition to #{@init_state.year_month} hour starting : #{start_time_str} Current operational time: #{current_time_str}" |
|
@conn.exec_prepared('migrate_node_run_partition_to_replacement', [start_time_str, hour_end_str]) |
|
end |
|
|
|
node_run_start_time = hour_end |
|
end |
|
puts "Rows for #{@init_state.year_month} from node_run parent and partition table(if exists) successfully migrated to replacement table\n" |
|
end #end migrate_node_run_data |
|
|
|
def migrate_node_run_detail_data |
|
#add fk index. This is done here and will slow things down but if added after the fact it becomes a resource hog and hosted can't keep up |
|
@conn.exec("ALTER TABLE node_run_detail_#{@init_state.year_month}_replacement ADD FOREIGN KEY(run_id) |
|
REFERENCES node_run_foreign_key(run_id) ON UPDATE RESTRICT ON DELETE CASCADE;") |
|
|
|
node_run_detail_start_time = @init_state.start_time |
|
while (node_run_detail_start_time < end_time) do |
|
hour_end = node_run_detail_start_time + 3600 |
|
start_time_str = format_time(node_run_detail_start_time) |
|
hour_end_str = format_time(hour_end) |
|
current_time_str = format_time(Time.new) |
|
|
|
puts "Migrating node_run_detail from parent hour starting : #{start_time_str} Current operational time: #{current_time_str}" |
|
@conn.exec_prepared('migrate_node_run_detail_to_replacement', [start_time_str, hour_end_str]) |
|
|
|
if @exists_node_run_detail_table |
|
puts "Removing duplicate rows from node_run_detail_#{@init_state.year_month}_partition before inserting from partition" |
|
@conn.exec_prepared('delete_node_run_detail_duplicate_pk', [start_time_str, hour_end_str]) |
|
|
|
puts "Migrating node_run_detail from partition hour starting : #{start_time_str} Current operational time: #{current_time_str}" |
|
@conn.exec_prepared('migrate_node_run_detail_partition_to_replacement', [start_time_str, hour_end_str]) |
|
end |
|
|
|
node_run_detail_start_time = hour_end |
|
end |
|
|
|
puts "Rows for #{@init_state.year_month} from node_run_detail parent and partition table(if exists) successfully migrated to replacement table\n" |
|
puts "\n*************************************************************************************************************************\n" |
|
end #end migrate_node_run_detail_data |
|
|
|
def migrate_indexes |
|
puts "Add the indexes and contraints" |
|
create_node_run_triggers_and_indices |
|
# create_node_run_detail_triggers_and_indices |
|
end |
|
|
|
def exists_table(name, query) |
|
puts "Check if the node table exists" |
|
@conn.prepare(name, query) |
|
res = @conn.exec_prepared(name, []) |
|
if res.getvalue(0, 0) == 't' |
|
true |
|
else |
|
false |
|
end |
|
end |
|
|
|
def run |
|
connect_to_postgres |
|
|
|
# Create the replacement tables |
|
puts "Creating node_run replacement table for #{@init_state.year_month}" |
|
create_node_run_replacement_table |
|
|
|
puts "Creating node_run_detail replacement table for #{@init_state.year_month}" |
|
create_node_run_detail_replacement_table |
|
|
|
begin |
|
exists_node_run_table = "SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'node_run_#{@init_state.year_month}');" |
|
exists_table('exists_node_run_table', exists_node_run_table) |
|
|
|
exists_node_run_detail_table = "SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'node_run_detail_#{@init_state.year_month}');" |
|
exists_table('exists_node_run_detail_table', exists_node_run_detail_table) |
|
|
|
prepare_statements |
|
if (!@init_state.options[:migrate_node_run_data_only] && |
|
!@init_state.options[:migrate_node_run_detail_data_only] && |
|
!@init_state.options[:migrate_indexes_only]) |
|
migrate_node_run_data |
|
migrate_node_run_detail_data |
|
migrate_indexes |
|
else |
|
if @init_state.options[:migrate_node_run_data_only] |
|
migrate_node_run_data |
|
end |
|
if @init_state.options[:migrate_node_run_detail_data_only] |
|
migrate_node_run_detail_data |
|
end |
|
if @init_state.options[:migrate_indexes_only] |
|
migrate_indexes |
|
end |
|
end |
|
|
|
rescue PG::Error => err |
|
puts "A problem occurred, attempting to re-enable triggers. Please double check database state." |
|
p [ |
|
err.result.error_field( PG::Result::PG_DIAG_SEVERITY ), |
|
err.result.error_field( PG::Result::PG_DIAG_SQLSTATE ), |
|
err.result.error_field( PG::Result::PG_DIAG_MESSAGE_PRIMARY ), |
|
err.result.error_field( PG::Result::PG_DIAG_MESSAGE_DETAIL ), |
|
err.result.error_field( PG::Result::PG_DIAG_MESSAGE_HINT ), |
|
err.result.error_field( PG::Result::PG_DIAG_STATEMENT_POSITION ), |
|
err.result.error_field( PG::Result::PG_DIAG_INTERNAL_POSITION ), |
|
err.result.error_field( PG::Result::PG_DIAG_INTERNAL_QUERY ), |
|
err.result.error_field( PG::Result::PG_DIAG_CONTEXT ), |
|
err.result.error_field( PG::Result::PG_DIAG_SOURCE_FILE ), |
|
err.result.error_field( PG::Result::PG_DIAG_SOURCE_LINE ), |
|
err.result.error_field( PG::Result::PG_DIAG_SOURCE_FUNCTION ), |
|
] |
|
end #begin rescue end |
|
@conn.close() |
|
end #end run |
|
end #end class |
|
|
|
options = {:current => false, |
|
:migrate_node_run_data_only => false, |
|
:migrate_node_run_detail_data_only => false, |
|
:migrate_indexes_only => false |
|
} |
|
|
|
opt_parser = OptionParser.new do |opts| |
|
opts.banner = "Usage: /opt/opscode/embedded/bin/ruby /path/to/script/migrate_2018.rb [options]" |
|
opts.separator "" |
|
|
|
opts.on("-c", "--current", "Running migrations for the current month") do |
|
options[:current] = true |
|
end |
|
|
|
opts.on("-n", "--migrate_node_run_data_only", "Migrate node_run data only") do |
|
options[:migrate_node_run_data_only] = true |
|
end |
|
|
|
opts.on("-d", "--migrate_node_run_detail_data_only", "Migrate node_run_detail data only") do |
|
options[:migrate_node_run_detail_data_only] = true |
|
end |
|
|
|
opts.on("-i", "--migrate_indexes_only", "Migrate indexes, constraints and triggers only") do |
|
options[:migrate_indexes_only] = true |
|
end |
|
|
|
opts.on("-h", "--help", "Help!") do |
|
puts opts |
|
exit |
|
end |
|
end #end option_parser |
|
opt_parser.parse! |
|
|
|
# Find if we are migrating the current partition |
|
if options[:current] |
|
puts "Migrating the current partition. Please ensure reporting is in maintainence_mode.\n" |
|
exit unless HighLine.agree('Do you want to proceed?') |
|
migrate = Migrate2018.new(2018, 02, options) |
|
migrate.run |
|
else |
|
[[2016, 12], [2017, 12], [2018, 01], [2018,02], [2018,03], [2018,04], [2018,05], [2018,06], [2018,07], [2018,08], [2018,09], [2018,10], [2018,11], [2018,12]].each do |year, month| |
|
migrate = Migrate2018.new(year, month, options) |
|
migrate.run |
|
end |
|
end |