Skip to content

Instantly share code, notes, and snippets.

@thbar
Last active May 14, 2020 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thbar/d41a811ca7721ae8f66f21b0bc03c497 to your computer and use it in GitHub Desktop.
Save thbar/d41a811ca7721ae8f66f21b0bc03c497 to your computer and use it in GitHub Desktop.
Automating the process of writing incoming CSV to a SQL database (with Kiba ETL)
# https://www.kiba-etl.org
# for source
require 'kiba-common/sources/enumerable'
require 'kiba-common/sources/csv'
# to create 1 source out of each input file
require 'kiba-common/transforms/source_adapter'
# for target - Kiba Pro is the commercial extension
# more information at https://github.com/thbar/kiba/wiki
require 'kiba-pro/destinations/sql_bulk_insert'
require 'sequel'
require 'pg'
# for file clean-up
require 'fileutils'
Sequel.connect(ENV.fetch('DATABASE_URL')) do |db|
processed_files = []
job = Kiba.parse do
source Kiba::Common::Sources::Enumerable, -> { Dir["input/*.csv"] }
transform do |file|
processed_files << file # keep a trace to move at the end
[
Kiba::Common::Sources::CSV,
filename: file,
csv_options: { headers: true }
]
end
# NOTE: this takes each row as a new source, and instantiate it
transform Kiba::Common::Transforms::SourceTransformAdapter
# NOTE: here you would transform the columns, remap value etc if needed
destination Kiba::Pro::Destinations::SQLBulkInsert,
database: db,
table: :products,
# if we need to upsert rather than insert
dataset: -> (dataset) {
dataset.insert_conflict(target: :some_unique_key)
},
buffer_size: 20_000
post_process do
logger = Logger.new(STDOUT)
processed_files.each do |file|
logger.info "Moving file #{file} to processed..."
FileUtils.mv(file, 'processed')
end
end
end
Kiba.run(job)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment