Skip to content

Instantly share code, notes, and snippets.

@yorickpeterse
Created April 24, 2015 10:14
Show Gist options
  • Save yorickpeterse/b7cf2e1648bd3fc6d713 to your computer and use it in GitHub Desktop.
Save yorickpeterse/b7cf2e1648bd3fc6d713 to your computer and use it in GitHub Desktop.
require 'oga'
require 'sequel'
require 'concurrent'
require 'progress_bar'
require 'set'
class SaxParser
RATINGS = {
'property_rating' => :rating,
'property_service_rating' => :service_rating,
'property_value_rating' => :value_rating
}
MAPPING = {
'uniq_id' => :promptcloud_id,
'tags' => :tags,
'url' => :url,
'name' => :name,
'address' => :address,
'city' => :city,
'postal_code' => :postal_code,
'country' => :country,
'phone' => :phone,
'email' => :email,
'latitude' => :latitude,
'longitude' => :longitude,
'property_type' => :property_type,
'region' => :region,
'stars' => :stars,
'ranking' => :ranking,
'website' => :website,
'amenities' => :amenities,
'number_of_traveler_photos' => :photos,
'number_of_rooms' => :rooms
}
RECORD = 'record'.freeze
RATING_REGEXP = /^([\d\.]+)/
def initialize
@record = nil
@element = nil
@records = []
end
def save
@records.each_slice(1000) do |slice|
DB[:promptcloud_properties].multi_insert(slice)
end
end
def on_element(namespace, name, attrs = {})
if name == RECORD
@record = {}
elsif @record
@element = name
end
end
def after_element(namespace, name)
if name == RECORD
db_row = {
:source_id => 116,
:created_at => Time.now
}
MAPPING.each do |from, to|
db_row[to] = @record[from]
end
RATINGS.each do |from, to|
next unless @record[from]
matches = @record[from].match(RATING_REGEXP)
db_row[to] = matches[1].to_f if matches
end
if db_row[:property_type]
db_row[:property_type] = db_row[:property_type].downcase
end
@records << db_row
@record = nil
end
@element = nil
end
def on_text(text)
if @record and @element
@record[@element] = text
end
end
end
db_host = ENV['DB_HOST']
db_name = ENV['DB_NAME']
db_user = ENV['DB_USER']
db_pass = ENV['DB_PASSWORD']
db_options = {
:test => true,
:encoding => 'UTF8',
:max_connections => 60
}
if RUBY_ENGINE == 'jruby'
DB = Sequel.connect(
"jdbc:postgresql://#{db_host}/#{db_name}?user=#{db_user}&password=#{db_pass}",
db_options
)
else
DB = Sequel.connect(
"postgres://#{db_user}:#{db_pass}@#{db_host}/#{db_name}",
db_options
)
end
pool = Concurrent::FixedThreadPool.new(5)
files = Dir.glob(File.expand_path('~/Downloads/promptcloud/hotel_data/**/*.xml'))
done = Queue.new
progress = ProgressBar.new(files.length)
puts 'Trapping signals...'
trap 'USR2' do
Thread.list.each do |thread|
puts "== #{thread.inspect}"
puts
puts thread.backtrace.join("\n")
puts
end
end
puts 'Starting threads...'
files.each do |file|
pool.post do
File.open(file, 'r') do |handle|
parser = SaxParser.new
Oga.sax_parse_xml(parser, handle)
parser.save
done << 1
end
end
end
loop do
done.pop
progress.increment!
end
pool.wait_for_termination
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment