-
-
Save yorickpeterse/b7cf2e1648bd3fc6d713 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'oga' | |
require 'sequel' | |
require 'concurrent' | |
require 'progress_bar' | |
require 'set' | |
class SaxParser | |
RATINGS = { | |
'property_rating' => :rating, | |
'property_service_rating' => :service_rating, | |
'property_value_rating' => :value_rating | |
} | |
MAPPING = { | |
'uniq_id' => :promptcloud_id, | |
'tags' => :tags, | |
'url' => :url, | |
'name' => :name, | |
'address' => :address, | |
'city' => :city, | |
'postal_code' => :postal_code, | |
'country' => :country, | |
'phone' => :phone, | |
'email' => :email, | |
'latitude' => :latitude, | |
'longitude' => :longitude, | |
'property_type' => :property_type, | |
'region' => :region, | |
'stars' => :stars, | |
'ranking' => :ranking, | |
'website' => :website, | |
'amenities' => :amenities, | |
'number_of_traveler_photos' => :photos, | |
'number_of_rooms' => :rooms | |
} | |
RECORD = 'record'.freeze | |
RATING_REGEXP = /^([\d\.]+)/ | |
def initialize | |
@record = nil | |
@element = nil | |
@records = [] | |
end | |
def save | |
@records.each_slice(1000) do |slice| | |
DB[:promptcloud_properties].multi_insert(slice) | |
end | |
end | |
def on_element(namespace, name, attrs = {}) | |
if name == RECORD | |
@record = {} | |
elsif @record | |
@element = name | |
end | |
end | |
def after_element(namespace, name) | |
if name == RECORD | |
db_row = { | |
:source_id => 116, | |
:created_at => Time.now | |
} | |
MAPPING.each do |from, to| | |
db_row[to] = @record[from] | |
end | |
RATINGS.each do |from, to| | |
next unless @record[from] | |
matches = @record[from].match(RATING_REGEXP) | |
db_row[to] = matches[1].to_f if matches | |
end | |
if db_row[:property_type] | |
db_row[:property_type] = db_row[:property_type].downcase | |
end | |
@records << db_row | |
@record = nil | |
end | |
@element = nil | |
end | |
def on_text(text) | |
if @record and @element | |
@record[@element] = text | |
end | |
end | |
end | |
db_host = ENV['DB_HOST'] | |
db_name = ENV['DB_NAME'] | |
db_user = ENV['DB_USER'] | |
db_pass = ENV['DB_PASSWORD'] | |
db_options = { | |
:test => true, | |
:encoding => 'UTF8', | |
:max_connections => 60 | |
} | |
if RUBY_ENGINE == 'jruby' | |
DB = Sequel.connect( | |
"jdbc:postgresql://#{db_host}/#{db_name}?user=#{db_user}&password=#{db_pass}", | |
db_options | |
) | |
else | |
DB = Sequel.connect( | |
"postgres://#{db_user}:#{db_pass}@#{db_host}/#{db_name}", | |
db_options | |
) | |
end | |
pool = Concurrent::FixedThreadPool.new(5) | |
files = Dir.glob(File.expand_path('~/Downloads/promptcloud/hotel_data/**/*.xml')) | |
done = Queue.new | |
progress = ProgressBar.new(files.length) | |
puts 'Trapping signals...' | |
trap 'USR2' do | |
Thread.list.each do |thread| | |
puts "== #{thread.inspect}" | |
puts | |
puts thread.backtrace.join("\n") | |
puts | |
end | |
end | |
puts 'Starting threads...' | |
files.each do |file| | |
pool.post do | |
File.open(file, 'r') do |handle| | |
parser = SaxParser.new | |
Oga.sax_parse_xml(parser, handle) | |
parser.save | |
done << 1 | |
end | |
end | |
end | |
loop do | |
done.pop | |
progress.increment! | |
end | |
pool.wait_for_termination |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment