Skip to content

Instantly share code, notes, and snippets.

@criess
Created February 16, 2021 11:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save criess/6e02e23c467db144c840ab8e2c57be62 to your computer and use it in GitHub Desktop.
Save criess/6e02e23c467db144c840ab8e2c57be62 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
# a dump of the funnel_exec table from big query is needed in line-separated-json format
# also we need a csv file with information about lead creation, email and pipedrive ids
#
# three positional parameters must be passed:
# * input json path
# * input csv path
# * fully qualified target (insert) table name
#
require 'oj'
require 'csv'
require 'yaml'
require 'active_support'
require 'active_support/core_ext'
PRIM_KEY_CSV = 'Person - Email'
PRIM_KEY_BQ = 'person_email'
TIME_CSV = 'Deal - Deal created'
TIME_BQ = 'lead_created_at'
DIRECT_MAP = %w[
carl_id
ga_user_id
ga_session_id
]
input_json, input_csv, target_table = ARGV[0..2]
SQL_MAP_KEYS = DIRECT_MAP + %w[lead_created_at pipedrive_person_id pipedrive_deal_id]
TIME_COMPONENTS = /\-| |:/
csv = CSV.read(input_csv, col_sep: ',', headers: 1)
bqd = File.open(input_json, 'rb', &:read).split("\n").map do |txt|
DIRECT_MAP.map { |k| [k, ''] }.to_h.merge(Oj.load(txt))
end
def map(row)
{
pipedrive_person_id: row['Person - ID'],
pipedrive_deal_id: row['Deal - ID']
}
end
sql_insert = "INSERT INTO #{target_table} \n(#{SQL_MAP_KEYS.join(', ')}) \nVALUES \n"
csv.each do |row|
csv_email = "#{row[PRIM_KEY_CSV]}".downcase.strip
csv_ts = Time.new(*row[TIME_CSV].split(TIME_COMPONENTS), '+01:00')
entry = bqd.select do |row_i|
bq_email = "#{row_i[PRIM_KEY_BQ]}".downcase.strip
bq_email == csv_email ||
csv_email.split(',').map(&:strip).include?(bq_email)
end
entry = entry.select do |o|
bq_ts = Time.new(*o[TIME_BQ].split(TIME_COMPONENTS)[0..5], '+00:00')
bq_ts < csv_ts
end if entry.size > 1
entry = entry.map do |o|
bq_ts = Time.new(*o[TIME_BQ].split(TIME_COMPONENTS)[0..5], '+00:00')
[(bq_ts - csv_ts).abs, o]
end.sort do |x, y| x[0] <=> y[0] end.map(&:last)[0, 1] if entry.size > 1
if entry.size == 0
puts("unable to find email: #{row[PRIM_KEY_CSV]}")
else entry.size == 1
mapped = entry[0].slice(*DIRECT_MAP).merge(
lead_created_at: entry[0]['lead_created_at'],
**map(row)
).stringify_keys
sql_insert += "(\"#{mapped.values.map { |x| x.blank? && "!DB_NULL!" || x }.join('", "')}\"), \n".gsub('"!DB_NULL!"', 'NULL')
end
end
puts sql_insert.strip.gsub(/,$/, ';')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment