Skip to content

Instantly share code, notes, and snippets.

@parameme
Created December 21, 2016 02:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parameme/2c7dbd3fd72d91c3cad8c53aad53b748 to your computer and use it in GitHub Desktop.
Save parameme/2c7dbd3fd72d91c3cad8c53aad53b748 to your computer and use it in GitHub Desktop.
Quick and dirty Heroku datafix runner
class Datafix
def initialize(options)
@options = options
@context = options[:context] || {}
@table = options[:table] || {}
@table_name = @table[:name] || 'tmp_datafix'
@table_reader_sql = @table[:reader_sql] || "SELECT * FROM #{@table_name}"
@table_row_action = @table[:row_action]
@apply_action = options[:apply_action]
@connection = options[:connection] || ActiveRecord::Base.connection
@logger = options[:logger] || Rails.logger
@old_logger = ActiveRecord::Base.logger
@log_level = options[:log_level] || Logger::DEBUG
@old_level = @logger.level
@csv = options[:csv]
@csv_io = @csv[:io]
@csv_filter_options = @csv[:filter_options] || {}
@csv_row_transformer = @csv[:row_transformer]
end
def run!(csv_io = @csv_io)
begin
@logger.level = @log_level
ActiveRecord::Base.logger = @logger
banner 'DATAFIX BEGIN'
@logger.tagged('DATAFIX') do
@connection.transaction do
table_columns_clause = @table[:columns].values.map{ |(sql_name, sql_type)| "#{sql_name} #{sql_type}"}.join(",\n")
PostgresCopier.with_temporary_table(@connection, @table_name, table_columns_clause) do
postgres_io = StringIO.new
mapping = Hash[@table[:columns].map{ |header, (sql_name, _)| [header, sql_name]}]
header_to_column = lambda { |header| mapping[header] }
columns_used = []
CSV.filter(@csv_io, postgres_io, @csv_filter_options.merge(in_headers: true, in_header_converters: [header_to_column])) do |row|
@csv_row_transformer.call(row, @context) if @csv_row_transformer.present?
row.delete_if {|header, _| header.nil? }
columns_used = row.headers if columns_used.empty? && row.headers.present?
end
postgres_io.rewind
PostgresCopier.copy_data_in(PostgresCopier.copy_data_in_command(@table_name, columns_used), postgres_io)
@connection.exec_query(@table_reader_sql).to_hash.each do |row|
begin
@table_row_action.call(row, @context)
rescue => e
@logger.error row.inspect
raise
end
end
@apply_action.call(@options, @connection, @logger) if @apply_action
end
end
end
banner 'DATAFIX END'
rescue => e
@logger.error "#{e.class}: #{e.message}"
@logger.error e.backtrace.join("\n")
raise
end
ensure
@logger.level = @old_level
ActiveRecord::Base.logger = @old_logger
end
private
def banner(message)
@logger.warn message.center(50, '=')
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment