Created
September 18, 2018 14:57
-
-
Save rytch14/fbd46d995887d1faa1fa88a055667cea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bundler/setup' | |
require 'rubygems' | |
require 'sequel' | |
require 'jdbc/dss' | |
require 'logger' | |
require 'pathname' | |
require 'date' | |
require 'json' | |
require 'peach' | |
require 'zlib' | |
Jdbc::DSS.load_driver | |
Java.com.gooddata.dss.jdbc.driver.DssDriver | |
@ads_instance = 'e754dad464ce3602dd44230afb57cb85' | |
@logger = Logger.new($stdout) | |
files_to_load = { | |
"sessions" => [], | |
"custom_events" => [], | |
"device_events" => [], | |
"notifications" => [], | |
"impressions"=>[] | |
} | |
dir_content = Dir.glob('mobile_events_v1/**/*').reject {|fn| File.directory?(fn) } | |
dir_content.each do |f| | |
File.open(f.gsub('.gz','.json'),'w').write(Zlib::GzipReader.open(f).read) | |
file_name = File.basename(f.gsub('.gz','.json')) | |
entity = f.split('/')[1] | |
files_to_load[entity] << { | |
"file_name" => file_name, | |
"file_path" => f, | |
"file_size_mb" => File.size(f.gsub('.gz','.json'))/1024/1024, | |
"copy_cmd" => "COPY mobile_events_#{entity} FROM LOCAL '#{f.gsub('.gz','.json')}' PARSER fjsonparser();" | |
} | |
end | |
def refresh_connection(entity) | |
@logger.info "Refresh ##{entity}" | |
Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance, :username => 'ps-etl+sailthru@gooddata.com', | |
:password => '', | |
:pool_timeout => 240, | |
:max_connections => 4) | |
end | |
@logger.info "SUMMARY" | |
files_to_load.keys.each do |k| | |
@logger.info "ENTITY #{k} contain #{files_to_load[k].length} files" | |
end | |
connection = Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance, | |
:username => 'ps-etl+sailthru@gooddata.com', | |
:password => '', | |
:pool_timeout => 240, | |
:max_connections => 4) | |
queue = Queue.new | |
threads = [] | |
files_to_load.keys.each {|x| queue << x} | |
queue.size.times do | |
threads << Thread.new do | |
entity = queue.pop(true) rescue nil | |
files_to_load[entity].each_with_index do |file, index| | |
index = index + 1 | |
if index % 80 == 0 | |
connection = refresh_connection(entity) | |
end | |
file_name = file['file_name'] | |
file_path = file['file_path'] | |
sql_stmnt = file['copy_cmd'] | |
@logger.info "#{Thread.current.object_id} || Executing #{sql_stmnt}. #{index}/#{files_to_load[entity].length}" | |
connection.execute sql_stmnt | |
connection.execute "insert INTO mobile_events_metadata (entity, file_name, file_size, row_count, inserted_at) values ('#{entity}', '#{file_name}',#{File.size(file_path)},#{`wc -l "#{file_path}" `.strip.split(' ')[0].to_i}, '#{Time.now()}')" | |
end | |
end | |
end | |
threads.each {|t| t.join} | |
files_to_load.keys.each do |entity| | |
row_count = connection.fetch("SELECT count(*) from mobile_events_#{entity}").all[0][:count] | |
if row_count > 0 | |
sql_stm = "SELECT compute_flextable_keys_and_build_view('mobile_events_#{entity}')" | |
@logger.info sql_stm | |
connection.fetch(sql_stm).all | |
end | |
end | |
#Preparing payloads for mobile notifications | |
#get all payload columns | |
sql_stmnt = "SELECT key_name AS key_name FROM mobile_events_notifications_keys WHERE REGEXP_LIKE(key_name, 'payload.')" | |
payload_columns_raw = connection.fetch(sql_stmnt).all | |
payload_columns = [] | |
payload_columns_raw.each {|c| payload_columns << c[:key_name]} | |
payload_columns.each do |pc| | |
#sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications(type, client_id, app_id, notification_id, message_id, platform, payload, payload_value, date, status) | |
# Select type, client_id, app_id, notification_id, message_id, platform, '#{pc.gsub('payload.', '')}', \"#{pc}\"::VARCHAR(10000), date, status from mobile_events_notifications_view | |
# group by type, client_id, app_id, notification_id, message_id, platform,'#{pc.gsub('payload.', '')}', \"#{pc}\", date, status;"; | |
sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications (user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, payload, payload_value) | |
SELECT user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload. ', '')}', \"#{pc}\":: VARCHAR (10000) FROM mobile_events_notifications_view | |
GROUP BY user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload.', '')}', \"#{pc}\";" | |
@logger.info sql_stmnt | |
connection.execute(sql_stmnt) | |
end | |
@logger.info "Job's done." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment