Skip to content

Instantly share code, notes, and snippets.

@rytch14
Created September 18, 2018 15:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rytch14/3b601765ae7e3d579ff36f3046687d2e to your computer and use it in GitHub Desktop.
Save rytch14/3b601765ae7e3d579ff36f3046687d2e to your computer and use it in GitHub Desktop.
require 'bundler/setup'
require 'rubygems'
require 'sequel'
require 'jdbc/dss'
require 'logger'
require 'pathname'
require 'date'
require 'json'
require 'peach'
require 'zlib'
Jdbc::DSS.load_driver
Java.com.gooddata.dss.jdbc.driver.DssDriver
@ads_instance = 'e754dad464ce3602dd44230afb57cb85'
@logger = Logger.new($stdout)
files_to_load = {
"sessions" => [],
"custom_events" => [],
"device_events" => [],
"notifications" => [],
"impressions"=>[]
}
dir_content = Dir.glob('mobile_events_v1/**/*').reject {|fn| File.directory?(fn) }
dir_content.each do |f|
@logger.info "Procesing and ungzipping #{f}"
new_name = f.gsub('.gz','.json')
system("gzip -c -d #{f} > #{new_name}")
#File.open(f.gsub('.gz','.json'),'w').write(Zlib::GzipReader.open(f).read)
file_name = File.basename(f)
entity = f.split('/')[1]
files_to_load[entity] << {
"file_name" => file_name,
"file_path" => new_name,
"file_size_origin" => File.size(f),
"file_size" => File.size(new_name),
"copy_cmd" => "COPY mobile_events_#{entity} FROM LOCAL '#{new_name}' PARSER fjsonparser();"
}
end
def refresh_connection(entity)
@logger.info "Refresh ##{entity}"
Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance, :username => 'ps-etl+sailthru@gooddata.com',
:password => '',
:pool_timeout => 240,
:max_connections => 4)
end
@logger.info "SUMMARY"
files_to_load.keys.each do |k|
@logger.info "ENTITY #{k} contain #{files_to_load[k].length} files"
end
connection = Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance,
:username => 'ps-etl+sailthru@gooddata.com',
:password => '',
:pool_timeout => 240,
:max_connections => 4)
queue = Queue.new
threads = []
files_to_load.keys.each {|x| queue << x}
queue.size.times do
threads << Thread.new do
entity = queue.pop(true) rescue nil
files_to_load[entity].each_with_index do |file, index|
index = index + 1
if index % 80 == 0
connection = refresh_connection(entity)
end
file_name = file['file_name']
file_size = file['file_size']
file_size_origin = file['file_size_origin']
sql_stmnt = file['copy_cmd']
file_path = file['file_path']
@logger.info "#{Thread.current.object_id} || Executing #{sql_stmnt}. #{index}/#{files_to_load[entity].length}"
connection.execute sql_stmnt
connection.execute "insert INTO mobile_events_metadata (entity, file_name, file_size, row_count, inserted_at, file_size_origin)
values ('#{entity}', '#{file_name}',#{file_size},#{`wc -l "#{file_path}" `.strip.split(' ')[0].to_i}, '#{Time.now()}', #{file_size_origin})"
end
end
end
threads.each {|t| t.join}
files_to_load.keys.each do |entity|
row_count = connection.fetch("SELECT count(*) from mobile_events_#{entity}").all[0][:count]
if row_count > 0
sql_stm = "SELECT compute_flextable_keys_and_build_view('mobile_events_#{entity}')"
@logger.info sql_stm
connection.fetch(sql_stm).all
end
end
#Preparing payloads for mobile notifications
#get all payload columns
sql_stmnt = "SELECT key_name AS key_name FROM mobile_events_notifications_keys WHERE REGEXP_LIKE(key_name, 'payload.')"
payload_columns_raw = connection.fetch(sql_stmnt).all
payload_columns = []
payload_columns_raw.each {|c| payload_columns << c[:key_name]}
payload_columns.each do |pc|
#sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications(type, client_id, app_id, notification_id, message_id, platform, payload, payload_value, date, status)
# Select type, client_id, app_id, notification_id, message_id, platform, '#{pc.gsub('payload.', '')}', \"#{pc}\"::VARCHAR(10000), date, status from mobile_events_notifications_view
# group by type, client_id, app_id, notification_id, message_id, platform,'#{pc.gsub('payload.', '')}', \"#{pc}\", date, status;";
sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications (user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, payload, payload_value)
SELECT user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload. ', '')}', \"#{pc}\":: VARCHAR (10000) FROM mobile_events_notifications_view
GROUP BY user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload.', '')}', \"#{pc}\";"
@logger.info sql_stmnt
connection.execute(sql_stmnt)
end
@logger.info "Job's done."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment